[hornetq-commits] JBoss hornetq SVN: r8818 - in trunk: src/main/org/hornetq/core/remoting and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 14:45:21 EST 2010


Author: timfox
Date: 2010-01-20 14:45:20 -0500 (Wed, 20 Jan 2010)
New Revision: 8818

Added:
   trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
   trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
   trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
   trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/remoting/Channel.java
   trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
   trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
   trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
factoring remoting layer for multi-protocols

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -35,6 +35,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
@@ -117,7 +118,7 @@
 
    private final Executor executor;
 
-   private volatile RemotingConnection remotingConnection;
+   private volatile CoreRemotingConnection remotingConnection;
 
    private final Set<ClientProducerInternal> producers = new ConcurrentHashSet<ClientProducerInternal>();
 
@@ -203,7 +204,7 @@
                             final int minLargeMessageSize,
                             final int initialMessagePacketSize,
                             final String groupID,
-                            final RemotingConnection remotingConnection,
+                            final CoreRemotingConnection remotingConnection,
                             final int version,
                             final Channel channel,
                             final Executor executor) throws HornetQException
@@ -834,7 +835,7 @@
 
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
 
-   public synchronized void handleFailover(final RemotingConnection backupConnection)
+   public synchronized void handleFailover(final CoreRemotingConnection backupConnection)
    {
       if (closed)
       {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -16,6 +16,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -52,7 +53,7 @@
 
    void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
 
-   void handleFailover(RemotingConnection backupConnection);
+   void handleFailover(CoreRemotingConnection backupConnection);
 
    RemotingConnection getConnection();
 

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -27,6 +27,7 @@
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -364,7 +365,7 @@
       return session.getXAResource();
    }
 
-   public void handleFailover(final RemotingConnection backupConnection)
+   public void handleFailover(final CoreRemotingConnection backupConnection)
    {
       session.handleFailover(backupConnection);
    }

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -16,7 +16,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 
 /**
  * A ConnectionManager
@@ -52,7 +52,7 @@
 
    void removeSession(final ClientSessionInternal session);
 
-   public RemotingConnection getConnection();
+   public CoreRemotingConnection getConnection();
 
    int numConnections();
 

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -37,9 +37,10 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -77,7 +78,7 @@
 
    // debug
 
-   private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
+   private static Map<TransportConfiguration, Set<CoreRemotingConnection>> debugConns;
 
    private static boolean debug = false;
 
@@ -85,7 +86,7 @@
    {
       FailoverManagerImpl.debug = true;
 
-      FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
+      FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration, Set<CoreRemotingConnection>>();
    }
 
    public static void disableDebug()
@@ -96,9 +97,9 @@
       FailoverManagerImpl.debugConns = null;
    }
 
-   private void checkAddDebug(final RemotingConnection conn)
+   private void checkAddDebug(final CoreRemotingConnection conn)
    {
-      Set<RemotingConnection> conns;
+      Set<CoreRemotingConnection> conns;
 
       synchronized (FailoverManagerImpl.debugConns)
       {
@@ -106,7 +107,7 @@
 
          if (conns == null)
          {
-            conns = new HashSet<RemotingConnection>();
+            conns = new HashSet<CoreRemotingConnection>();
 
             FailoverManagerImpl.debugConns.put(connectorConfig, conns);
          }
@@ -117,7 +118,7 @@
 
    public static void failAllConnectionsForConnector(final TransportConfiguration config)
    {
-      Set<RemotingConnection> conns;
+      Set<CoreRemotingConnection> conns;
 
       synchronized (FailoverManagerImpl.debugConns)
       {
@@ -125,13 +126,13 @@
 
          if (conns != null)
          {
-            conns = new HashSet<RemotingConnection>(FailoverManagerImpl.debugConns.get(config));
+            conns = new HashSet<CoreRemotingConnection>(FailoverManagerImpl.debugConns.get(config));
          }
       }
 
       if (conns != null)
       {
-         for (RemotingConnection conn : conns)
+         for (CoreRemotingConnection conn : conns)
          {
             conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "simulated connection failure"));
          }
@@ -175,7 +176,7 @@
 
    private final Executor closeExecutor;
 
-   private RemotingConnection connection;
+   private CoreRemotingConnection connection;
 
    private final long retryInterval;
 
@@ -272,7 +273,7 @@
 
    // ConnectionLifeCycleListener implementation --------------------------------------------------
 
-   public void connectionCreated(final Connection connection)
+   public void connectionCreated(final Connection connection, final ProtocolType protocol)
    {
    }
 
@@ -319,7 +320,7 @@
          {
             Version clientVersion = VersionLoader.getVersion();
 
-            RemotingConnection theConnection = null;
+            CoreRemotingConnection theConnection = null;
 
             Lock lock = null;
 
@@ -651,7 +652,7 @@
 
             // So.. do failover / reconnection
 
-            RemotingConnection oldConnection = connection;
+            CoreRemotingConnection oldConnection = connection;
 
             connection = null;
 
@@ -753,9 +754,9 @@
    /*
     * Re-attach sessions all pre-existing sessions to the new remoting connection
     */
-   private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts)
+   private void reconnectSessions(final CoreRemotingConnection oldConnection, final int reconnectAttempts)
    {
-      RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
+      CoreRemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
 
       if (newConnection == null)
       {
@@ -786,7 +787,7 @@
       }
    }
 
-   private RemotingConnection getConnectionWithRetry(final int reconnectAttempts)
+   private CoreRemotingConnection getConnectionWithRetry(final int reconnectAttempts)
    {
       long interval = retryInterval;
 
@@ -799,7 +800,7 @@
             return null;
          }
 
-         RemotingConnection theConnection = getConnection();
+         CoreRemotingConnection theConnection = getConnection();
 
          if (theConnection == null)
          {
@@ -897,7 +898,7 @@
       }
    }
 
-   public RemotingConnection getConnection()
+   public CoreRemotingConnection getConnection()
    {
       if (connection == null)
       {
@@ -1057,9 +1058,9 @@
 
    private class Channel0Handler implements ChannelHandler
    {
-      private final RemotingConnection conn;
+      private final CoreRemotingConnection conn;
 
-      private Channel0Handler(final RemotingConnection conn)
+      private Channel0Handler(final CoreRemotingConnection conn)
       {
          this.conn = conn;
       }
@@ -1088,7 +1089,7 @@
    {
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
-         RemotingConnection theConn = connection;
+         CoreRemotingConnection theConn = connection;
 
          if (theConn != null && connectionID == theConn.getID())
          {

Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,10 +17,10 @@
 import org.hornetq.api.core.HornetQException;
 
 /**
- * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.RemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.CoreRemotingConnection}.
  * <p/>
  * Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
  * <p/>
  * Each Channel should will forward any packets received to its {@link org.hornetq.core.remoting.ChannelHandler}.
  * <p/>
@@ -81,7 +81,7 @@
     *
     * @param newConnection the new connection
     */
-   void transferConnection(RemotingConnection newConnection);
+   void transferConnection(CoreRemotingConnection newConnection);
 
    /**
     * resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
    /**
     * returns the Remoting Connection being used by the channel
     */
-   RemotingConnection getConnection();
+   CoreRemotingConnection getConnection();
 
    /**
     * sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
    void flushConfirmations();
 
    /**
-    * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is received.
+    * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is received.
     * <p/>
     * This method should then call its {@link org.hornetq.core.remoting.ChannelHandler} after appropriate processing of
     * the packet

Added: trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+
+/**
+ * Extension of RemotingConnection for the HornetQ core protocol
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface CoreRemotingConnection extends RemotingConnection
+{  
+   /**
+    * return the channel with the channel id specified.
+    * <p/>
+    * If it does not exist create it with the confirmation window size.
+    *
+    * @param channelID      the channel id
+    * @param confWindowSize the confirmation window size
+    * @return the channel
+    */
+   Channel getChannel(long channelID, int confWindowSize);
+
+   /**
+    * add the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @param channel   the channel
+    */
+   void putChannel(long channelID, Channel channel);
+
+   /**
+    * remove the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @return true if removed
+    */
+   boolean removeChannel(long channelID);
+
+   /**
+    * generate a unique (within this connection) channel id
+    *
+    * @return the id
+    */
+   long generateChannelID();
+
+   /**
+    * resets the id generator used to when generating id's
+    *
+    * @param id the first id to set it to
+    */
+   void syncIDGeneratorSequence(long id);
+
+   /**
+    * return the next id that will be chosen.
+    *
+    * @return the id
+    */
+   long getIDGeneratorSequence();
+
+   /**
+    * return the current tomeout for blocking calls
+    *
+    * @return the timeout in milliseconds
+    */
+   long getBlockingCallTimeout();
+
+   /**
+    * return the transfer lock used when transferring connections.
+    *
+    * @return the lock
+    */
+   Object getTransferLock();
+}

Added: trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/ProtocolType.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/ProtocolType.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+/**
+ * A ProtocolType
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public enum ProtocolType
+{
+   CORE, STOMP, AMQP;
+}

Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -46,40 +46,6 @@
    String getRemoteAddress();
 
    /**
-    * return the channel with the channel id specified.
-    * <p/>
-    * If it does not exist create it with the confirmation window size.
-    *
-    * @param channelID      the channel id
-    * @param confWindowSize the confirmation window size
-    * @return the channel
-    */
-   Channel getChannel(long channelID, int confWindowSize);
-
-   /**
-    * add the channel with the specified channel id
-    *
-    * @param channelID the channel id
-    * @param channel   the channel
-    */
-   void putChannel(long channelID, Channel channel);
-
-   /**
-    * remove the channel with the specified channel id
-    *
-    * @param channelID the channel id
-    * @return true if removed
-    */
-   boolean removeChannel(long channelID);
-
-   /**
-    * generate a unique (within this connection) channel id
-    *
-    * @return the id
-    */
-   long generateChannelID();
-
-   /**
     * add a failure listener.
     * <p/>
     * The listener will be called in the event of connection failure.
@@ -150,20 +116,6 @@
    void destroy();
 
    /**
-    * resets the id generator used to when generating id's
-    *
-    * @param id the first id to set it to
-    */
-   void syncIDGeneratorSequence(long id);
-
-   /**
-    * return the next id that will be chosen.
-    *
-    * @return the id
-    */
-   long getIDGeneratorSequence();
-
-   /**
     * return the underlying Connection.
     *
     * @return the connection
@@ -182,36 +134,23 @@
     *
     * @return true if destroyed, otherwise false
     */
-   boolean isDestroyed();
-
+   boolean isDestroyed();    
+   
    /**
-    * return the current tomeout for blocking calls
-    *
-    * @return the timeout in milliseconds
+    * Disconnect the connection, closing all channels
     */
-   long getBlockingCallTimeout();
-
+   void disconnect();
+   
    /**
-    * return the transfer lock used when transferring connections.
-    *
-    * @return the lock
-    */
-   Object getTransferLock();
-
-   /**
     * returns true if any data has been received since the last time this method was called.
     *
     * @return true if data has been received.
     */
    boolean checkDataReceived();
-
+   
    /**
-    * remove all channels from the remoting connection
+    * flush all outstanding data from the connection.
     */
-   void removeAllChannels();
+   void flush();
 
-   /**
-    * flush all outstanding confirmations onto the connection.
-    */
-   void flushConfirmations();
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,6 +25,7 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -52,7 +53,7 @@
 
    private volatile int lastConfirmedCommandID = -1;
 
-   private volatile RemotingConnection connection;
+   private volatile CoreRemotingConnection connection;
 
    private volatile boolean closed;
 
@@ -76,7 +77,7 @@
       
    private volatile boolean transferring;
  
-   public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
+   public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
    {
       this.connection = connection;
 
@@ -316,7 +317,7 @@
       closed = true;
    }
    
-   public void transferConnection(final RemotingConnection newConnection)
+   public void transferConnection(final CoreRemotingConnection newConnection)
    {
       // Needs to synchronize on the connection to make sure no packets from
       // the old connection get processed after transfer has occurred
@@ -326,7 +327,7 @@
 
          // And switch it
 
-         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+         final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
 
          rnewConnection.putChannel(id, this);
 
@@ -369,7 +370,7 @@
       lock.unlock();
    }
 
-   public RemotingConnection getConnection()
+   public CoreRemotingConnection getConnection()
    {
       return connection;
    }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -26,9 +26,9 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision$</tt> $Id$
  */
-public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------
@@ -84,6 +84,8 @@
    private volatile boolean dataReceived;
 
    private final Executor executor;
+   
+   private volatile boolean executing;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -275,7 +277,22 @@
 
       callClosingListeners();
    }
+   
+   public void disconnect()
+   {
+      Channel channel0 = getChannel(0, -1);
 
+      // And we remove all channels from the connection, this ensures no more packets will be processed after this
+      // method is
+      // complete
+
+      removeAllChannels();
+
+      // Now we are 100% sure that no more packets will be processed we can send the disconnect
+
+      channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+   }
+
    public long generateChannelID()
    {
       return idGenerator.generateID();
@@ -325,20 +342,12 @@
       return res;
    }
 
-   public void removeAllChannels()
+   //We flush any confirmations on the connection - this prevents idle bridges for example
+   //sitting there with many unacked messages
+   public void flush()
    {
-      // We get the transfer lock first - this ensures no packets are being processed AND
-      // it's guaranteed no more packets will be processed once this method is complete
       synchronized (transferLock)
       {
-         channels.clear();
-      }
-   }
-
-   public void flushConfirmations()
-   {
-      synchronized (transferLock)
-      {
          for (Channel channel : channels.values())
          {
             channel.flushConfirmations();
@@ -349,8 +358,6 @@
    // Buffer Handler implementation
    // ----------------------------------------------------
 
-   private volatile boolean executing;
-
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
@@ -434,6 +441,15 @@
    // Private
    // --------------------------------------------------------------------------------------
 
+   private void removeAllChannels()
+   {
+      // We get the transfer lock first - this ensures no packets are being processed AND
+      // it's guaranteed no more packets will be processed once this method is complete
+      synchronized (transferLock)
+      {
+         channels.clear();
+      }
+   }  
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.remoting.Acceptor;
@@ -215,14 +216,14 @@
          this.connector = connector;
       }
 
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent((String)connection.getID(), connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection);
+         listener.connectionCreated(connection, protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,8 +17,8 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -70,7 +70,7 @@
 
       this.executor = executor;
 
-      listener.connectionCreated(this);
+      listener.connectionCreated(this, ProtocolType.CORE);
    }
 
    private volatile boolean closing;

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -19,6 +19,7 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -174,14 +175,14 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent((String)connection.getID(), connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection);
+         listener.connectionCreated(connection, protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Added: trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+   public final RemotingConnection connection;
+
+   public volatile long lastCheck;
+
+   public volatile long ttl;
+
+   public ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+   {
+      this.connection = connection;
+
+      this.lastCheck = lastCheck;
+
+      this.ttl = ttl;
+   }
+}

Added: trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A ProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ProtocolManager
+{
+   ConnectionEntry createConnectionEntry(Connection connection);
+
+}

Added: trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A CoreProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class CoreProtocolManager implements ProtocolManager
+{
+   private final HornetQServer server;
+   
+   private final List<Interceptor> interceptors;
+
+   public CoreProtocolManager(final HornetQServer server,
+                              final List<Interceptor> interceptors)
+   {
+      this.server = server;
+
+      this.interceptors = interceptors;
+   }
+
+   public ConnectionEntry createConnectionEntry(final Connection connection)
+   {
+      final Configuration config = server.getConfiguration();
+      
+      CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
+                                                             interceptors,
+                                                             config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor()
+                                                                                                       : null);
+
+      Channel channel1 = rc.getChannel(1, -1);
+
+      ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
+
+      channel1.setHandler(handler);
+
+      long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+      if (config.getConnectionTTLOverride() != -1)
+      {
+         ttl = config.getConnectionTTLOverride();
+      }
+
+      final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
+
+      final Channel channel0 = rc.getChannel(0, -1);
+
+      channel0.setHandler(new ChannelHandler()
+      {
+         public void handlePacket(final Packet packet)
+         {
+            if (packet.getType() == PacketImpl.PING)
+            {
+               Ping ping = (Ping)packet;
+
+               if (config.getConnectionTTLOverride() == -1)
+               {
+                  // Allow clients to specify connection ttl
+                  entry.ttl = ping.getConnectionTTL();
+               }
+
+               // Just send a ping back
+               channel0.send(packet);
+            }
+         }
+      });
+
+      return entry;
+   }
+}

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -26,20 +26,15 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
@@ -89,7 +84,10 @@
    private final ScheduledExecutorService scheduledThreadPool;
 
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+   
+   private Map<ProtocolType, ProtocolManager> protocolMap = 
+      new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -122,6 +120,8 @@
       this.managementService = managementService;
       this.threadPool = threadPool;
       this.scheduledThreadPool = scheduledThreadPool;
+      
+      this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManager(server, interceptors));
    }
 
    // RemotingService implementation -------------------------------
@@ -170,6 +170,7 @@
             if (managementService != null)
             {
                acceptor.setNotificationService(managementService);
+               
                managementService.registerAcceptor(acceptor, info);
             }
          }
@@ -235,17 +236,7 @@
       {
          RemotingConnection conn = entry.connection;
 
-         Channel channel0 = conn.getChannel(0, -1);
-
-         // And we remove all channels from the connection, this ensures no more packets will be processed after this
-         // method is
-         // complete
-
-         conn.removeAllChannels();
-
-         // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
-         channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+         conn.disconnect();
       }
 
       for (Acceptor acceptor : acceptors)
@@ -304,61 +295,32 @@
 
    // ConnectionLifeCycleListener implementation -----------------------------------
 
-   public void connectionCreated(final Connection connection)
+   private ProtocolManager getProtocolManager(ProtocolType protocol)
    {
+      return protocolMap.get(protocol);
+   }
+   
+   public void connectionCreated(final Connection connection, final ProtocolType protocol)
+   {
       if (server == null)
       {
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
-
-      RemotingConnection rc = new RemotingConnectionImpl(connection,
-                                                         interceptors,
-                                                         config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-                                                                                                            .getExecutor()
-                                                                                                   : null);
-
-      Channel channel1 = rc.getChannel(1, -1);
-
-      ChannelHandler handler = createHandler(rc, channel1);
-
-      channel1.setHandler(handler);
-
-      long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
-
-      if (config.getConnectionTTLOverride() != -1)
+      
+      ProtocolManager pmgr = this.getProtocolManager(protocol);
+      
+      if (pmgr == null)
       {
-         ttl = config.getConnectionTTLOverride();
+         throw new IllegalArgumentException("Unknown protocol " + protocol);
       }
 
-      final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
-
+      ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+      
       connections.put(connection.getID(), entry);
 
-      final Channel channel0 = rc.getChannel(0, -1);
-
-      channel0.setHandler(new ChannelHandler()
-      {
-         public void handlePacket(final Packet packet)
-         {
-            if (packet.getType() == PacketImpl.PING)
-            {
-               Ping ping = (Ping)packet;
-
-               if (config.getConnectionTTLOverride() == -1)
-               {
-                  // Allow clients to specify connection ttl
-                  entry.ttl = ping.getConnectionTTL();
-               }
-
-               // Just send a ping back
-               channel0.send(packet);
-            }
-         }
-      });
-
       if (config.isBackup())
       {
-         serverSideReplicatingConnection = rc;
+         serverSideReplicatingConnection = entry.connection;
       }
    }
 
@@ -409,14 +371,6 @@
 
    // Protected -----------------------------------------------------
 
-   /**
-    * Subclasses (on tests) may use this to create a different channel.
-    */
-   protected ChannelHandler createHandler(final RemotingConnection rc, final Channel channel)
-   {
-      return new HornetQPacketHandler(server, channel, rc);
-   }
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
@@ -434,24 +388,6 @@
       }
    }
 
-   private static final class ConnectionEntry
-   {
-      final RemotingConnection connection;
-
-      volatile long lastCheck;
-
-      volatile long ttl;
-
-      ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
-      {
-         this.connection = connection;
-
-         this.lastCheck = lastCheck;
-
-         this.ttl = ttl;
-      }
-   }
-
    private final class FailureCheckAndFlushThread extends Thread
    {
       private final long pauseInterval;
@@ -516,11 +452,8 @@
                }
                
                if (flush)
-               {
-                  //We flush any confirmations on the connection - this prevents idle bridges for example
-                  //sitting there with many unacked messages
-                                    
-                  conn.flushConfirmations();
+               {                                   
+                  conn.flush();
                }
             }
 

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -31,8 +31,8 @@
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
@@ -69,7 +69,7 @@
 
    private final FailoverManager failoverManager;
 
-   private RemotingConnection replicatingConnection;
+   private CoreRemotingConnection replicatingConnection;
 
    private Channel replicatingChannel;
 

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -24,7 +24,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -70,7 +70,7 @@
 
    void unregisterActivateCallback(ActivateCallback callback);
 
-   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
+   ReattachSessionResponseMessage reattachSession(CoreRemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
 
    /** The journal at the backup server has to be equivalent as the journal used on the live node. 
     *  Or else the backup node is out of sync. */
@@ -82,7 +82,7 @@
                                               String password,
                                               int minLargeMessageSize,
                                               int incrementingVersion,
-                                              RemotingConnection remotingConnection,
+                                              CoreRemotingConnection remotingConnection,
                                               boolean autoCommitSends,
                                               boolean autoCommitAcks,
                                               boolean preAcknowledge,

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -19,7 +19,7 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 
 /**
  *
@@ -109,7 +109,7 @@
 
    void close() throws Exception;
 
-   int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
+   int transferConnection(CoreRemotingConnection newConnection, int lastReceivedCommandID);
 
    Channel getChannel();
 

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,8 +22,8 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -48,9 +48,9 @@
 
    private final Channel channel1;
 
-   private final RemotingConnection connection;
+   private final CoreRemotingConnection connection;
 
-   public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final RemotingConnection connection)
+   public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final CoreRemotingConnection connection)
    {
       this.server = server;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -69,7 +69,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -536,7 +536,7 @@
       return clusterManager;
    }
 
-   public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
+   public ReattachSessionResponseMessage reattachSession(final CoreRemotingConnection connection,
                                                          final String name,
                                                          final int lastConfirmedCommandID) throws Exception
    {
@@ -593,7 +593,7 @@
                                                      final String password,
                                                      final int minLargeMessageSize,
                                                      final int incrementingVersion,
-                                                     final RemotingConnection connection,
+                                                     final CoreRemotingConnection connection,
                                                      final boolean autoCommitSends,
                                                      final boolean autoCommitAcks,
                                                      final boolean preAcknowledge,

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -44,8 +44,8 @@
 import org.hornetq.core.postoffice.QueueBinding;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.server.BindingQueryResult;
@@ -100,7 +100,7 @@
 
    private final boolean strictUpdateDeliveryCount;
 
-   private RemotingConnection remotingConnection;
+   private CoreRemotingConnection remotingConnection;
 
    private Channel channel;
 
@@ -152,7 +152,7 @@
                             final boolean preAcknowledge,
                             final boolean strictUpdateDeliveryCount,
                             final boolean xa,
-                            final RemotingConnection remotingConnection,
+                            final CoreRemotingConnection remotingConnection,
                             final Channel channel,
                             final StorageManager storageManager,
                             final PostOffice postOffice,
@@ -1037,7 +1037,7 @@
       }
    }
 
-   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+   public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
    {
       // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
       // delivered

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -59,9 +59,9 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
@@ -126,7 +126,7 @@
 
    private final Channel channel;
 
-   private volatile RemotingConnection remotingConnection;
+   private volatile CoreRemotingConnection remotingConnection;
 
    public ServerSessionPacketHandler(final ServerSession session,
                                      final OperationContext sessionContext,
@@ -205,45 +205,8 @@
       return channel;
    }
 
-   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
-   {
-      // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
-      // delivered
-      // after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
-      // sequence of packets.
-      // It is not sufficient to just stop the session, since right after stopping the session, another session start
-      // might be executed
-      // before we have transferred the connection, leaving it in a started state
-      session.setTransferring(true);
+  
 
-      removeConnectionListeners();
-
-      // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
-      // then the connection will get cleaned up anyway when the server ping timeout kicks in.
-      // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
-      // the replicating connection will cause the outstanding responses to be be replayed on the live server,
-      // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
-      // received responses that the backup did not know about.
-
-      channel.transferConnection(newConnection);
-
-      newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
-      remotingConnection = newConnection;
-
-      addConnectionListeners();
-
-      int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
-
-      channel.replayCommands(lastReceivedCommandID);
-
-      channel.setTransferring(false);
-
-      session.setTransferring(false);
-
-      return serverLastReceivedCommandID;
-   }
-
    public void handlePacket(final Packet packet)
    {
       byte type = packet.getType();

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -31,6 +31,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
@@ -513,14 +514,14 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection);
+         listener.connectionCreated(connection, protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,6 +17,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -55,7 +56,7 @@
 
       this.listener = listener;
 
-      listener.connectionCreated(this);
+      listener.connectionCreated(this, ProtocolType.CORE);
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -30,6 +30,7 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -598,7 +599,7 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), connection) != null)
          {

Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -13,6 +13,7 @@
 package org.hornetq.spi.core.remoting;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
 
 /**
  * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
@@ -26,7 +27,7 @@
     *
     * @param connection the connection that has been created
     */
-   void connectionCreated(Connection connection);
+   void connectionCreated(Connection connection, ProtocolType protocol);
 
    /**
     * called when a connection is destroyed.

Modified: trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,8 +25,8 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.impl.FailoverManagerImpl;
 import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -51,7 +51,7 @@
 
    private HornetQServer server;
 
-   private RemotingConnection connection;
+   private CoreRemotingConnection connection;
 
    // Static --------------------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,6 +25,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyConnector;
@@ -549,7 +550,7 @@
          latch = connCreatedLatch;
       }
 
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          this.connection = connection;
          if (latch != null)

Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,13 +22,17 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.FailoverManagerImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -345,14 +349,14 @@
 
       session.addFailureListener(clientListener);
 
-      RemotingConnection serverConn = null;
+      CoreRemotingConnection serverConn = null;
       while (serverConn == null)
       {
          Set<RemotingConnection> conns = server.getRemotingService().getConnections();
 
          if (!conns.isEmpty())
          {
-            serverConn = server.getRemotingService().getConnections().iterator().next();
+            serverConn = (CoreRemotingConnection)server.getRemotingService().getConnections().iterator().next();
          }
          else
          {

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -63,7 +64,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection)
+         public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
       };

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.TransportConstants;
@@ -77,7 +78,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection)
+         public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
       };

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.integration.transports.netty.NettyConnection;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -219,7 +220,7 @@
    class MyListener implements ConnectionLifeCycleListener
    {
 
-      public void connectionCreated(final Connection connection)
+      public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
 
       }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyConnector;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -66,7 +67,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection)
+         public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
       };
@@ -103,7 +104,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection)
+         public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
       };



More information about the hornetq-commits mailing list