[hornetq-commits] JBoss hornetq SVN: r8825 - in branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core: remoting/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 09:40:58 EST 2010


Author: jmesnil
Date: 2010-01-21 09:40:58 -0500 (Thu, 21 Jan 2010)
New Revision: 8825

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* sync with the trunk: svn merge -r 8806:8821 https://svn.jboss.org/repos/hornetq/trunk

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java (from rev 8819, trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+   public final RemotingConnection connection;
+
+   public volatile long lastCheck;
+
+   public volatile long ttl;
+
+   public ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+   {
+      this.connection = connection;
+
+      this.lastCheck = lastCheck;
+
+      this.ttl = ttl;
+   }
+}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java (from rev 8819, trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A ProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ProtocolManager
+{
+   ConnectionEntry createConnectionEntry(Connection connection);
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-21 14:39:29 UTC (rev 8824)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -26,20 +26,16 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.protocol.core.CoreProtocolManager;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.spi.core.remoting.Acceptor;
@@ -90,7 +86,10 @@
    private final ScheduledExecutorService scheduledThreadPool;
 
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+   
+   private Map<ProtocolType, ProtocolManager> protocolMap = 
+      new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -123,6 +122,8 @@
       this.managementService = managementService;
       this.threadPool = threadPool;
       this.scheduledThreadPool = scheduledThreadPool;
+      
+      this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManager(server, interceptors));
    }
 
    // RemotingService implementation -------------------------------
@@ -169,13 +170,14 @@
                   return server;
                }
                
-               public RemotingConnection getRemotingConnection(int connectionID)
+               public CoreRemotingConnection getRemotingConnection(int connectionID)
                {
                   ConnectionEntry conn = connections.get(connectionID);
 
                   if (conn != null)
                   {
-                     return conn.connection;
+                     // FIXME ....
+                     return (CoreRemotingConnection)conn.connection;
                   }
                   else
                   {
@@ -192,6 +194,7 @@
             if (managementService != null)
             {
                acceptor.setNotificationService(managementService);
+               
                managementService.registerAcceptor(acceptor, info);
             }
          }
@@ -257,17 +260,7 @@
       {
          RemotingConnection conn = entry.connection;
 
-         Channel channel0 = conn.getChannel(0, -1);
-
-         // And we remove all channels from the connection, this ensures no more packets will be processed after this
-         // method is
-         // complete
-
-         conn.removeAllChannels();
-
-         // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
-         channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+         conn.disconnect();
       }
 
       for (Acceptor acceptor : acceptors)
@@ -326,61 +319,32 @@
 
    // ConnectionLifeCycleListener implementation -----------------------------------
 
-   public void connectionCreated(final Connection connection)
+   private ProtocolManager getProtocolManager(ProtocolType protocol)
    {
+      return protocolMap.get(protocol);
+   }
+   
+   public void connectionCreated(final Connection connection, final ProtocolType protocol)
+   {
       if (server == null)
       {
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
-
-      RemotingConnection rc = new RemotingConnectionImpl(connection,
-                                                         interceptors,
-                                                         config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-                                                                                                            .getExecutor()
-                                                                                                   : null);
-
-      Channel channel1 = rc.getChannel(1, -1);
-
-      ChannelHandler handler = createHandler(rc, channel1);
-
-      channel1.setHandler(handler);
-
-      long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
-
-      if (config.getConnectionTTLOverride() != -1)
+      
+      ProtocolManager pmgr = this.getProtocolManager(protocol);
+      
+      if (pmgr == null)
       {
-         ttl = config.getConnectionTTLOverride();
+         throw new IllegalArgumentException("Unknown protocol " + protocol);
       }
 
-      final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
-
+      ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+      
       connections.put(connection.getID(), entry);
 
-      final Channel channel0 = rc.getChannel(0, -1);
-
-      channel0.setHandler(new ChannelHandler()
-      {
-         public void handlePacket(final Packet packet)
-         {
-            if (packet.getType() == PacketImpl.PING)
-            {
-               Ping ping = (Ping)packet;
-
-               if (config.getConnectionTTLOverride() == -1)
-               {
-                  // Allow clients to specify connection ttl
-                  entry.ttl = ping.getConnectionTTL();
-               }
-
-               // Just send a ping back
-               channel0.send(packet);
-            }
-         }
-      });
-
       if (config.isBackup())
       {
-         serverSideReplicatingConnection = rc;
+         serverSideReplicatingConnection = entry.connection;
       }
    }
 
@@ -431,49 +395,23 @@
 
    // Protected -----------------------------------------------------
 
-   /**
-    * Subclasses (on tests) may use this to create a different channel.
-    */
-   protected ChannelHandler createHandler(final RemotingConnection rc, final Channel channel)
-   {
-      return new HornetQPacketHandler(server, channel, rc);
-   }
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
 
    private final class DelegatingBufferHandler implements BufferHandler
    {
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
          ConnectionEntry conn = connections.get(connectionID);
 
          if (conn != null)
          {
-            conn.connection.bufferReceived(connectionID, buffer, decoder);
+            conn.connection.bufferReceived(connectionID, buffer);
          }
       }
    }
 
-   private static final class ConnectionEntry
-   {
-      final RemotingConnection connection;
-
-      volatile long lastCheck;
-
-      volatile long ttl;
-
-      ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
-      {
-         this.connection = connection;
-
-         this.lastCheck = lastCheck;
-
-         this.ttl = ttl;
-      }
-   }
-
    private final class FailureCheckAndFlushThread extends Thread
    {
       private final long pauseInterval;
@@ -538,11 +476,8 @@
                }
 
                if (flush)
-               {
-                  // We flush any confirmations on the connection - this prevents idle bridges for example
-                  // sitting there with many unacked messages
-
-                  conn.flushConfirmations();
+               {                                   
+                  conn.flush();
                }
             }
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2010-01-21 14:39:29 UTC (rev 8824)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -15,8 +15,8 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.server.HornetQComponent;
 
 /**

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-01-21 14:39:29 UTC (rev 8824)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -28,24 +28,24 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.PacketImpl;
+import org.hornetq.core.protocol.core.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.wireformat.NullResponseMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.LargeServerMessage;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2010-01-21 14:39:29 UTC (rev 8824)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2010-01-21 14:40:58 UTC (rev 8825)
@@ -29,24 +29,24 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.PacketImpl;
+import org.hornetq.core.protocol.core.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.utils.ExecutorFactory;
 
@@ -69,7 +69,7 @@
 
    private final FailoverManager failoverManager;
 
-   private RemotingConnection replicatingConnection;
+   private CoreRemotingConnection replicatingConnection;
 
    private Channel replicatingChannel;
 



More information about the hornetq-commits mailing list