[hornetq-commits] JBoss hornetq SVN: r11371 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core/impl and 15 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 19 20:50:32 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-19 20:50:32 -0400 (Mon, 19 Sep 2011)
New Revision: 11371

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Refactoring clustering manager / cluster connection

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -53,6 +53,7 @@
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -367,7 +368,7 @@
 
    // ConnectionLifeCycleListener implementation --------------------------------------------------
 
-   public void connectionCreated(final Connection connection, final ProtocolType protocol)
+   public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
    {
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -1111,6 +1111,11 @@
       }
    }
 
+   public String getIdentity()
+   {
+      return identity;
+   }
+   
    public void setIdentity(String identity)
    {
       this.identity = identity;
@@ -1282,8 +1287,8 @@
       {
          log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
       }
-
-      if (topology.removeMember(eventTime, nodeID))
+      
+      if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
       {
          if (topology.isEmpty())
          {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -39,8 +39,12 @@
    
    void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
    
-   /** Used to better identify Cluster Connection Locators on logs while debugging logs */
+   /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
+    * 
+    *  This method used to be on tests interface, but I'm now making it part of the public interface since*/
    void setIdentity(String identity);
+   
+   String getIdentity();
 
    void setNodeID(String nodeID);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -108,7 +108,7 @@
       {
          if (log.isDebugEnabled())
          {
-            log.info(this + "::Live node " + nodeId + "=" + memberInput);
+            log.debug(this + "::node " + nodeId + "=" + memberInput);
          }
          memberInput.setUniqueEventID(System.currentTimeMillis());
          mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
                             currentMember +
                             ", memberInput=" +
                             memberInput +
-                            "newMember=" + newMember);
+                            "newMember=" + newMember, new Exception ("trace"));
                }
 
 
@@ -301,7 +301,7 @@
          {
             if (member.getUniqueEventID() > uniqueEventID)
             {
-               log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+               log.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
                member = null;
             }
             else
@@ -482,22 +482,17 @@
    public synchronized String describe(final String text)
    {
 
-      String desc = text + "\n";
+      String desc = text + "topology on " + this + ":\n";
       for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(mapTopology).entrySet())
       {
          desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
       }
       desc += "\t" + "nodes=" + nodes() + "\t" + "members=" + members();
-      return desc;
-   }
-
-   public void clear()
-   {
-      if (Topology.log.isDebugEnabled())
+      if (mapTopology.isEmpty())
       {
-         Topology.log.debug(this + "::clear", new Exception("trace"));
+         desc += "\tEmpty";
       }
-      mapTopology.clear();
+      return desc;
    }
 
    public int members()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -42,6 +42,7 @@
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
 
 /**
@@ -68,7 +69,7 @@
       this.interceptors = interceptors;
    }
 
-   public ConnectionEntry createConnectionEntry(final Connection connection)
+   public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
    {
       final Configuration config = server.getConfiguration();
       
@@ -177,16 +178,18 @@
                };
                
                final boolean isCC = msg.isClusterConnection();
-               
-               server.getClusterManager().addClusterTopologyListener(listener, isCC);
-               
-               rc.addCloseListener(new CloseListener()
+               if (acceptorUsed.getClusterConnection() != null)
                {
-                  public void connectionClosed()
+                  acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+                  
+                  rc.addCloseListener(new CloseListener()
                   {
-                     server.getClusterManager().removeClusterTopologyListener(listener, isCC);
-                  }
-               });
+                     public void connectionClosed()
+                     {
+                        acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+                     }
+                  });
+               }
             }
             else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
             {
@@ -205,7 +208,8 @@
                {
                   log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
                }
-               server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
+
+               acceptorUsed.getClusterConnection().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
             }
          }
       });

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -38,6 +38,7 @@
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.utils.UUIDGenerator;
@@ -109,7 +110,7 @@
 
    // ProtocolManager implementation --------------------------------
 
-   public ConnectionEntry createConnectionEntry(final Connection connection)
+   public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
    {
       StompConnection conn = new StompConnection(connection, this);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -54,16 +55,21 @@
    private volatile boolean started;
 
    private final ExecutorFactory executorFactory;
+   
+   private final ClusterConnection clusterConnection;
 
    private boolean paused;
 
    private NotificationService notificationService;
 
-   public InVMAcceptor(final Map<String, Object> configuration,
+   public InVMAcceptor(final ClusterConnection clusterConnection,
+                       final Map<String, Object> configuration,
                        final BufferHandler handler,                       
                        final ConnectionLifeCycleListener listener,
                        final Executor threadPool)
    {
+      this.clusterConnection = clusterConnection;
+      
       this.handler = handler;
       
       this.listener = listener;
@@ -73,6 +79,11 @@
       executorFactory = new OrderedExecutorFactory(threadPool);
    }
 
+   public ClusterConnection getClusterConnection()
+   {
+      return clusterConnection;
+   }
+   
    public synchronized void start() throws Exception
    {
       if (started)
@@ -189,7 +200,7 @@
          throw new IllegalStateException("Acceptor is not started");
       }
 
-      new InVMConnection(id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
+      new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
    }
 
    public void disconnect(final String connectionID)
@@ -209,6 +220,8 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
+      //private static Listener instance = new Listener();
+      
       private final InVMConnector connector;
 
       Listener(final InVMConnector connector)
@@ -216,14 +229,14 @@
          this.connector = connector;
       }
 
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent((String)connection.getID(), connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection, protocol);
+         listener.connectionCreated(acceptor, connection, protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -17,6 +17,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
  */
 public class InVMAcceptorFactory implements AcceptorFactory
 {
-   public Acceptor createAcceptor(final Map<String, Object> configuration,
+   public Acceptor createAcceptor(final ClusterConnection clusterConnection,
+                                  final Map<String, Object> configuration,
                                   final BufferHandler handler,
                                   final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)
    {
-      return new InVMAcceptor(configuration, handler, listener, threadPool);
+      return new InVMAcceptor(clusterConnection, configuration, handler, listener, threadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -57,15 +58,17 @@
    
    private volatile boolean closing;
 
-   public InVMConnection(final int serverID,
+   public InVMConnection(final Acceptor acceptor, 
+                         final int serverID,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
                          final Executor executor)
    {
-      this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
+      this(acceptor, serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
    }
 
-   public InVMConnection(final int serverID,
+   public InVMConnection(final Acceptor acceptor, 
+                         final int serverID,
                          final String id,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
@@ -81,7 +84,7 @@
 
       this.executor = executor;
 
-      listener.connectionCreated(this, ProtocolType.CORE);
+      listener.connectionCreated(acceptor, this, ProtocolType.CORE);
    }
 
    public void close()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -172,19 +172,20 @@
                                                  final ConnectionLifeCycleListener listener,
                                                  final Executor serverExecutor)
    {
-      return new InVMConnection(id, handler, listener, serverExecutor);
+      // No acceptor on a client connection
+      return new InVMConnection(null, id, handler, listener, serverExecutor);
    }
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent((String)connection.getID(), connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection, protocol);
+         listener.connectionCreated(acceptor, connection, protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -37,6 +37,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -87,6 +88,8 @@
 {
    static final Logger log = Logger.getLogger(NettyAcceptor.class);
 
+   private ClusterConnection clusterConnection;
+   
    private ChannelFactory channelFactory;
 
    private volatile ChannelGroup serverChannelGroup;
@@ -158,6 +161,7 @@
    private final long batchDelay;
 
    private final boolean directDeliver;
+   
 
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
@@ -166,6 +170,21 @@
                         final Executor threadPool,
                         final ScheduledExecutorService scheduledThreadPool)
    {
+      this(null, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+   }
+
+
+   public NettyAcceptor(final ClusterConnection clusterConnection,
+                        final Map<String, Object> configuration,
+                        final BufferHandler handler,
+                        final BufferDecoder decoder,
+                        final ConnectionLifeCycleListener listener,
+                        final Executor threadPool,
+                        final ScheduledExecutorService scheduledThreadPool)
+   {
+      
+      this.clusterConnection = clusterConnection;
+      
       this.handler = handler;
 
       this.decoder = decoder;
@@ -618,6 +637,14 @@
    {
       this.notificationService = notificationService;
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.spi.core.remoting.Acceptor#getClusterConnection()
+    */
+   public ClusterConnection getClusterConnection()
+   {
+      return clusterConnection;
+   }
 
    // Inner classes -----------------------------------------------------------------------------
 
@@ -633,7 +660,7 @@
       @Override
       public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
-         new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
+         new NettyConnection(NettyAcceptor.this, e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
 
          SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
          if (sslHandler != null)
@@ -662,14 +689,14 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
 
-         listener.connectionCreated(connection, NettyAcceptor.this.protocol);
+         listener.connectionCreated(acceptor, connection, NettyAcceptor.this.protocol);
       }
 
       public void connectionDestroyed(final Object connectionID)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -18,6 +18,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
  */
 public class NettyAcceptorFactory implements AcceptorFactory
 {
-   public Acceptor createAcceptor(final Map<String, Object> configuration,
+   public Acceptor createAcceptor(final ClusterConnection connection,
+                                  final Map<String, Object> configuration,
                                   final BufferHandler handler,
                                   final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+      return new NettyAcceptor(connection, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.ReadyListener;
@@ -72,6 +73,15 @@
                           boolean batchingEnabled,
                           boolean directDeliver)
    {
+      this(null, channel, listener, batchingEnabled, directDeliver);
+   }
+   
+   public NettyConnection(final Acceptor acceptor,
+                          final Channel channel,
+                          final ConnectionLifeCycleListener listener,
+                          boolean batchingEnabled,
+                          boolean directDeliver)
+   {
       this.channel = channel;
 
       this.listener = listener;
@@ -80,7 +90,7 @@
 
       this.directDeliver = directDeliver;
 
-      listener.connectionCreated(this, ProtocolType.CORE);
+      listener.connectionCreated(acceptor, this, ProtocolType.CORE);
    }
 
    // Public --------------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -35,6 +35,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -495,7 +496,8 @@
             ch.getPipeline().get(HornetQChannelHandler.class).active = true;
          }
 
-         NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0, false);
+         // No acceptor on a client connection
+         NettyConnection conn = new NettyConnection(null, ch, new Listener(), !httpEnabled && batchDelay > 0, false);
 
          return conn;
       }
@@ -689,7 +691,7 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), connection) != null)
          {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -39,6 +39,7 @@
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.impl.ServerSessionImpl;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -94,6 +95,8 @@
    private final ScheduledExecutorService scheduledThreadPool;
 
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
+   
+   private final ClusterManager clusterManager;
 
    private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
 
@@ -101,7 +104,8 @@
 
    // Constructors --------------------------------------------------
 
-   public RemotingServiceImpl(final Configuration config,
+   public RemotingServiceImpl(final ClusterManager clusterManager,
+                              final Configuration config,
                               final HornetQServer server,
                               final ManagementService managementService,
                               final ScheduledExecutorService scheduledThreadPool)
@@ -109,6 +113,8 @@
       transportConfigs = config.getAcceptorConfigurations();
 
       this.server = server;
+      
+      this.clusterManager = clusterManager;
 
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       for (String interceptorClass : config.getInterceptorClassNames())
@@ -202,7 +208,9 @@
 
             ProtocolManager manager = protocolMap.get(protocol);
 
-            Acceptor acceptor = factory.createAcceptor(info.getParams(),
+            // TODO: parameterize the cluster connection
+            Acceptor acceptor = factory.createAcceptor(clusterManager.getDefaultConnection(),
+                                                       info.getParams(),
                                                        new DelegatingBufferHandler(),
                                                        manager,
                                                        this,
@@ -370,7 +378,7 @@
       return protocolMap.get(protocol);
    }
 
-   public void connectionCreated(final Connection connection, final ProtocolType protocol)
+   public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
    {
       if (server == null)
       {
@@ -384,7 +392,7 @@
          throw new IllegalArgumentException("Unknown protocol " + protocol);
       }
 
-      ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+      ConnectionEntry entry = pmgr.createConnectionEntry(acceptor, connection);
 
       if (isTrace)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -15,9 +15,11 @@
 
 import java.util.Map;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 
@@ -37,7 +39,13 @@
    String getNodeID();
    
    HornetQServer getServer();
+   
+   void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
 
+   void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+   
+   void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+   
    /**
     * @return a Map of node ID and addresses
     */
@@ -47,8 +55,14 @@
    
    TransportConfiguration getConnector();
    
+   Topology getTopology();
+   
    void flushExecutor();
 
    // for debug
    String describe();
+
+   void informTopology();
+   
+   void announceBackup();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -16,11 +16,7 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -37,26 +33,26 @@
    Map<String, Bridge> getBridges();
 
    Set<ClusterConnection> getClusterConnections();
+   
+   /**
+    * Return the default ClusterConnection to be used case it's not defined by the acceptor
+    * @return
+    */
+   ClusterConnection getDefaultConnection();
 
    ClusterConnection getClusterConnection(SimpleString name);
 
    Set<BroadcastGroup> getBroadcastGroups();
-
-   void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
    
-   void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-   
    void activate();
 
-   void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
-
-   Topology getTopology();
-   
    void flushExecutor();
 
    void announceBackup() throws Exception;
+   
+   void deploy() throws Exception;
 
-   void deployBridge(BridgeConfiguration config) throws Exception;
+   void deployBridge(BridgeConfiguration config, boolean start) throws Exception;
 
    void destroyBridge(String name) throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -33,6 +33,8 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.AfterConnectInternalListener;
@@ -46,6 +48,7 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -79,8 +82,6 @@
 
    private final ExecutorFactory executorFactory;
 
-   private final Topology clusterManagerTopology;
-
    private final Executor executor;
 
    private final HornetQServer server;
@@ -140,9 +141,17 @@
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
 
    private final ClusterManagerInternal manager;
+   
+   
+   // Stuff that used to be on the ClusterManager
 
+   
+   private final Topology topology = new Topology(this);
+
+   private volatile ServerLocatorInternal backupServerLocator;
+
+
    public ClusterConnectionImpl(final ClusterManagerInternal manager,
-                                final Topology clusterManagerTopology,
                                 final TransportConfiguration[] tcConfigs,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -204,6 +213,8 @@
       this.executorFactory = executorFactory;
 
       this.executor = executorFactory.getExecutor();
+      
+      this.topology.setExecutor(executor);
 
       this.server = server;
 
@@ -227,8 +238,6 @@
 
       this.callTimeout = callTimeout;
 
-      this.clusterManagerTopology = clusterManagerTopology;
-
       clusterConnector = new StaticClusterConnector(tcConfigs);
 
       if (tcConfigs != null && tcConfigs.length > 0)
@@ -244,7 +253,6 @@
    }
 
    public ClusterConnectionImpl(final ClusterManagerImpl manager,
-                                final Topology clusterManagerTopology,
                                 DiscoveryGroupConfiguration dg,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -308,6 +316,8 @@
       this.executorFactory = executorFactory;
 
       this.executor = executorFactory.getExecutor();
+      
+      this.topology.setExecutor(executor);
 
       this.server = server;
 
@@ -330,11 +340,9 @@
       clusterConnector = new DiscoveryClusterConnector(dg);
 
       this.manager = manager;
-
-      this.clusterManagerTopology = clusterManagerTopology;
    }
 
-   public void start() throws Exception
+       public void start() throws Exception
    {
       synchronized (this)
       {
@@ -410,13 +418,21 @@
                                                       props);
          managementService.sendNotification(notification);
       }
+      
 
+
       executor.execute(new Runnable()
       {
          public void run()
          {
             synchronized (ClusterConnectionImpl.this)
             {
+               if (backupServerLocator != null)
+               {
+                  backupServerLocator.close();
+                  backupServerLocator = null;
+               }
+
                if (serverLocator != null)
                {
                   serverLocator.close();
@@ -430,12 +446,97 @@
       started = false;
    }
 
+   
+   public void announceBackup()
+   {
+      this.backupServerLocator = clusterConnector.createServerLocator(false);
+      
+      backupServerLocator.setReconnectAttempts(-1);
+      backupServerLocator.setInitialConnectAttempts(-1);
+
+       
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
+               }
+               ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
+               if (backupSessionFactory != null)
+               {
+                  backupSessionFactory.getConnection()
+                                      .getChannel(0, -1)
+                                      .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+                                                                    nodeUUID.toString(),
+                                                                    true,
+                                                                    connector,
+                                                                    null));
+                  log.info("backup announced");
+               }
+            }
+            catch (Exception e)
+            {
+               log.warn("Unable to announce backup, retrying", e);
+            }
+         }
+      });
+   }
+
+   private TopologyMember getLocalMember()
+   {
+      return topology.getMember(manager.getNodeId());
+   }
+   
+   public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+   {
+      topology.addClusterTopologyListener(listener);
+
+      // no need to use an executor here since the Topology is already using one
+      topology.sendTopology(listener);
+   }
+
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+   {
+      topology.removeClusterTopologyListener(listener);
+   }
+
+   public Topology getTopology()
+   {
+      return topology;
+   }
+   
+   public void nodeAnnounced(final long uniqueEventID,
+                             final String nodeID,
+                             final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                             final boolean backup)
+   {
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
+      }
+
+      TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+      newMember.setUniqueEventID(uniqueEventID);
+      if (backup)
+      {
+         topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
+      }
+      else
+      {
+         topology.updateMember(uniqueEventID, nodeID, newMember);
+      }
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
     */
    public void onConnection(ClientSessionFactoryInternal sf)
    {
-      TopologyMember localMember = manager.getLocalMember();
+      TopologyMember localMember = getLocalMember();
       sf.sendNodeAnnounce(localMember.getUniqueEventID(),
                           manager.getNodeId(),
                           false,
@@ -498,9 +599,27 @@
       }
 
       backup = false;
+      
+      topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
 
-      serverLocator = clusterConnector.createServerLocator();
+      if (backupServerLocator != null)
+      {
+         // todo we could use the topology of this to preempt it arriving from the cc
+         try
+         {
+            backupServerLocator.close();
+         }
+         catch (Exception e)
+         {
+            log.warn("problem closing backup session factory", e);
+         }
+         backupServerLocator = null;
+      }
 
+
+
+      serverLocator = clusterConnector.createServerLocator(true);
+
       if (serverLocator != null)
       {
 
@@ -509,7 +628,7 @@
             log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
          }
 
-         final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+         final TopologyMember currentMember = topology.getMember(manager.getNodeId());
 
          if (currentMember == null)
          {
@@ -554,6 +673,7 @@
          log.debug("sending notification: " + notification);
          managementService.sendNotification(notification);
       }
+
    }
 
    public TransportConfiguration getConnector()
@@ -660,7 +780,6 @@
                {
                   log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
                }
-               log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
 
                // New node - create a new flow record
 
@@ -701,7 +820,26 @@
          }
       }
    }
+   
+   public synchronized void informTopology()
+   {
+      String nodeID = server.getNodeID().toString();
+      
+      TopologyMember localMember;
+      
+      if (backup)
+      {
+         localMember = new TopologyMember(null, connector);
+      }
+      else
+      {
+         localMember = new TopologyMember(connector, null);
+      }
 
+      topology.updateAsLive(nodeID, localMember);
+   }
+
+
    private void createNewRecord(final long eventUID,
                                 final String targetNodeID,
                                 final TransportConfiguration connector,
@@ -709,7 +847,7 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
-      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(clusterManagerTopology, false, connector);
+      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
       
       String nodeId;
       
@@ -1365,7 +1503,8 @@
    @Override
    public String toString()
    {
-      return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+      return "ClusterConnectionImpl@" + System.identityHashCode(this)  + 
+             "[nodeUUID=" + nodeUUID +
              ", connector=" +
              connector +
              ", address=" +
@@ -1395,7 +1534,7 @@
 
    interface ClusterConnector
    {
-      ServerLocatorInternal createServerLocator();
+      ServerLocatorInternal createServerLocator(boolean includeTopology);
    }
 
    private class StaticClusterConnector implements ClusterConnector
@@ -1407,7 +1546,7 @@
          this.tcConfigs = tcConfigs;
       }
 
-      public ServerLocatorInternal createServerLocator()
+      public ServerLocatorInternal createServerLocator(boolean includeTopology)
       {
          if (tcConfigs != null && tcConfigs.length > 0)
          {
@@ -1415,7 +1554,9 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+            ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+            locator.setClusterConnection(true);
+            return locator;
          }
          else
          {
@@ -1443,9 +1584,11 @@
          this.dg = dg;
       }
 
-      public ServerLocatorInternal createServerLocator()
+      public ServerLocatorInternal createServerLocator(boolean includeTopology)
       {
-         return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+         ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
+         return locator;
+
       }
    }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -30,15 +30,10 @@
 import java.util.concurrent.ScheduledFuture;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -46,7 +41,6 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -83,6 +77,8 @@
    private final PostOffice postOffice;
 
    private final ScheduledExecutorService scheduledExecutor;
+   
+   private ClusterConnection defaultClusterConnection;
 
    private final ManagementService managementService;
 
@@ -99,10 +95,6 @@
    // the cluster connections which links this node to other cluster nodes
    private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
 
-   private final Topology topology = new Topology(this);
-
-   private volatile ServerLocatorInternal backupServerLocator;
-
    private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();
 
    private final Executor executor;
@@ -126,8 +118,6 @@
 
       executor = executorFactory.getExecutor();;
 
-      topology.setExecutor(executor);
-
       this.server = server;
 
       this.postOffice = postOffice;
@@ -152,7 +142,6 @@
 
       out.println("Information on " + this);
       out.println("*******************************************************");
-      out.println("Topology: " + topology.describe("Toopology on " + this));
 
       for (ClusterConnection conn : this.clusterConnections.values())
       {
@@ -163,29 +152,24 @@
 
       return str.toString();
    }
+   
+   public ClusterConnection getDefaultConnection()
+   {
+      return defaultClusterConnection;
+   }
 
    public String toString()
    {
       return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
    }
    
-   public TopologyMember getLocalMember()
-   {
-      return topology.getMember(nodeUUID.toString());
-   }
-   
    public String getNodeId()
    {
       return nodeUUID.toString();
    }
 
-   public synchronized void start() throws Exception
+   public synchronized void deploy() throws Exception
    {
-      if (started)
-      {
-         return;
-      }
-
       if (clustered)
       {
          for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
@@ -193,43 +177,44 @@
             deployBroadcastGroup(config);
          }
 
-         String connectorName = null;
-
          for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
          {
-            if (connectorName == null)
-            {
-               connectorName = config.getConnectorName();
-               break;
-            }
-          }
+            deployClusterConnection(config);
+         }
+      }
+   }
 
-         if (connectorName != null)
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      for (BroadcastGroup group: broadcastGroups.values())
+      {
+         if (!backup)
          {
-            TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
-            if (nodeConnector == null)
-            {
-               log.warn("No connecor with name '" + connectorName +
-                        "'. The cluster connection will not be deployed.");
-               return;
-            }
-   
-            // Now announce presence
-            announceNode(nodeConnector);
-   
-            for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
-            {
-               deployClusterConnection(config);
-            }
+            group.start();
          }
-
       }
+      
+      for (ClusterConnection conn : clusterConnections.values())
+      {
+         conn.start();
+         if (backup)
+         {
+            conn.informTopology();
+            conn.announceBackup();
+         }
+      }
 
       for (BridgeConfiguration config : configuration.getBridgeConfigurations())
       {
-         deployBridge(config);
+         deployBridge(config, !backup);
       }
 
+
       started = true;
    }
 
@@ -267,12 +252,6 @@
          }
 
          bridges.clear();
-
-         if (backupServerLocator != null)
-         {
-            backupServerLocator.close();
-            backupServerLocator = null;
-         }
       }
 
       for (ServerLocatorInternal clusterLocator : clusterLocators)
@@ -289,31 +268,9 @@
       clusterLocators.clear();
       started = false;
 
-      clusterConnections.clear();
+      clearClusterConnections();
    }
 
-   public void nodeAnnounced(final long uniqueEventID,
-                             final String nodeID,
-                             final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                             final boolean backup)
-   {
-      if (log.isDebugEnabled())
-      {
-         log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
-      }
-
-      TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
-      newMember.setUniqueEventID(uniqueEventID);
-      if (backup)
-      {
-         topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
-      }
-      else
-      {
-         topology.updateMember(uniqueEventID, nodeID, newMember);
-      }
-   }
-
    public void flushExecutor()
    {
       Future future = new Future();
@@ -350,24 +307,6 @@
       return clusterConnections.get(name.toString());
    }
 
-   public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
-   {
-      topology.addClusterTopologyListener(listener);
-
-      // no need to use an executor here since the Topology is already using one
-      topology.sendTopology(listener);
-   }
-
-   public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
-   {
-      topology.removeClusterTopologyListener(listener);
-   }
-
-   public Topology getTopology()
-   {
-      return topology;
-   }
-
    // backup node becomes live
    public synchronized void activate()
    {
@@ -375,27 +314,6 @@
       {
          backup = false;
 
-         String nodeID = server.getNodeID().toString();
-
-         TopologyMember member = topology.getMember(nodeID);
-         // swap backup as live and send it to everybody
-         member = new TopologyMember(member.getConnector().b, null);
-         topology.updateAsLive(nodeID, member);
-
-         if (backupServerLocator != null)
-         {
-            // todo we could use the topology of this to preempt it arriving from the cc
-            try
-            {
-               backupServerLocator.close();
-            }
-            catch (Exception e)
-            {
-               log.warn("problem closing backup session factory", e);
-            }
-            backupServerLocator = null;
-         }
-
          for (BroadcastGroup broadcastGroup : broadcastGroups.values())
          {
             try
@@ -432,31 +350,15 @@
                log.warn("unable to start bridge " + bridge.getName(), e);
             }
          }
-
-         topology.sendMember(nodeID);
       }
    }
 
    public void announceBackup() throws Exception
    {
-      List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
-      if (!configs.isEmpty())
+      for (ClusterConnection conn : this.clusterConnections.values())
       {
-         ClusterConnectionConfiguration config = configs.get(0);
-
-         TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
-
-         if (connector == null)
-         {
-            log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced.");
-            return;
-         }
-         announceBackup(config, connector);
+         conn.announceBackup();
       }
-      else
-      {
-         log.warn("no cluster connections defined, unable to announce backup");
-      }
    }
 
    public void addClusterLocator(final ServerLocatorInternal serverLocator)
@@ -468,114 +370,9 @@
    {
       this.clusterLocators.remove(serverLocator);
    }
-
-   private synchronized void announceNode(final TransportConfiguration nodeConnector)
+   
+   public synchronized void deployBridge(final BridgeConfiguration config, final boolean start) throws Exception
    {
-      String nodeID = server.getNodeID().toString();
-      
-      TopologyMember localMember;
-      if (backup)
-      {
-         localMember = new TopologyMember(null, nodeConnector);
-      }
-      else
-      {
-         localMember = new TopologyMember(nodeConnector, null);
-      }
-
-      topology.updateAsLive(nodeID, localMember);
-   }
-
-   private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
-   {
-      if (broadcastGroups.containsKey(config.getName()))
-      {
-         ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
-                                     " deployed. This one will not be deployed.");
-
-         return;
-      }
-
-      InetAddress localAddress = null;
-      if (config.getLocalBindAddress() != null)
-      {
-         localAddress = InetAddress.getByName(config.getLocalBindAddress());
-      }
-
-      InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
-      BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
-                                                        config.getName(),
-                                                        localAddress,
-                                                        config.getLocalBindPort(),
-                                                        groupAddress,
-                                                        config.getGroupPort(),
-                                                        !backup);
-
-      for (String connectorInfo : config.getConnectorInfos())
-      {
-         TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
-
-         if (connector == null)
-         {
-            logWarnNoConnector(config.getName(), connectorInfo);
-
-            return;
-         }
-
-         group.addConnector(connector);
-      }
-
-      ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
-                                                                           0L,
-                                                                           config.getBroadcastPeriod(),
-                                                                           MILLISECONDS);
-
-      group.setScheduledFuture(future);
-
-      broadcastGroups.put(config.getName(), group);
-
-      managementService.registerBroadcastGroup(group, config);
-
-      if (!backup)
-      {
-         group.start();
-      }
-   }
-
-   private void logWarnNoConnector(final String connectorName, final String bgName)
-   {
-      ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
-                                  "'. The broadcast group with name '" +
-                                  bgName +
-                                  "' will not be deployed.");
-   }
-
-   private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
-   {
-      TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
-                                                                                       connectorNames.size());
-      int count = 0;
-      for (String connectorName : connectorNames)
-      {
-         TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
-
-         if (connector == null)
-         {
-            ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
-                                        "'. The bridge will not be deployed.");
-
-            return null;
-         }
-
-         tcConfigs[count++] = connector;
-      }
-
-      return tcConfigs;
-   }
-
-   public synchronized void deployBridge(final BridgeConfiguration config) throws Exception
-   {
       if (config.getName() == null)
       {
          ClusterManagerImpl.log.warn("Must specify a unique name for each bridge. This one will not be deployed.");
@@ -702,11 +499,12 @@
       bridges.put(config.getName(), bridge);
 
       managementService.registerBridge(bridge, config);
-
-      if (!backup)
+      
+      if (start)
       {
          bridge.start();
       }
+
    }
 
    public void destroyBridge(final String name) throws Exception
@@ -726,11 +524,49 @@
       bridge.flushExecutor();
    }
 
-   private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+   // for testing
+   public void clear()
    {
+      for (Bridge bridge : bridges.values())
+      {
+         try
+         {
+            bridge.stop();
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
+      }
+      bridges.clear();
+      for (ClusterConnection clusterConnection : clusterConnections.values())
+      {
+         try
+         {
+            clusterConnection.stop();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+      clearClusterConnections();
+   }
+
+   // Private methods ----------------------------------------------------------------------------------------------------
+   
+   
+   private void clearClusterConnections()
+   {
+      clusterConnections.clear();
+      this.defaultClusterConnection = null;
+   }
+   
+   private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+   {
       if (config.getName() == null)
       {
-         ClusterManagerImpl.log.warn("Must specify a unique name for each cluster. This one will not be deployed.");
+         ClusterManagerImpl.log.warn("Must specify a unique name for each cluster connection. This one will not be deployed.");
 
          return;
       }
@@ -781,7 +617,6 @@
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
-                                                       topology,
                                                        dg,
                                                        connector,
                                                        new SimpleString(config.getName()),
@@ -819,7 +654,6 @@
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
-                                                       topology,
                                                        tcConfigs,
                                                        connector,
                                                        new SimpleString(config.getName()),
@@ -847,6 +681,11 @@
                                                        config.isAllowDirectConnectionsOnly());
       }
 
+      if (defaultClusterConnection == null)
+      {
+         defaultClusterConnection = clusterConnection;
+      }
+      
       managementService.registerCluster(clusterConnection, config);
 
       clusterConnections.put(config.getName(), clusterConnection);
@@ -855,75 +694,8 @@
       {
          log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
       }
-      clusterConnection.start();
-
-      if (backup)
-      {
-         announceBackup(config, connector);
-      }
    }
-
-   private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
-   {
-      if (config.getStaticConnectors() != null)
-      {
-         TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
-         backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
-         backupServerLocator.setReconnectAttempts(-1);
-         backupServerLocator.setInitialConnectAttempts(-1);
-      }
-      else if (config.getDiscoveryGroupName() != null)
-      {
-         DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
-                                                       .get(config.getDiscoveryGroupName());
-
-         if (dg == null)
-         {
-            ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
-                                        "'. The cluster connection will not be deployed.");
-         }
-
-         backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
-         backupServerLocator.setReconnectAttempts(-1);
-         backupServerLocator.setInitialConnectAttempts(-1);
-      }
-      else
-      {
-         return;
-      }
-      log.info("announcing backup");
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               if (log.isDebugEnabled())
-               {
-                  log.debug(ClusterManagerImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
-               }
-               ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
-               if (backupSessionFactory != null)
-               {
-                  backupSessionFactory.getConnection()
-                                      .getChannel(0, -1)
-                                      .send(new NodeAnnounceMessage(System.currentTimeMillis(),
-                                                                    nodeUUID.toString(),
-                                                                    true,
-                                                                    connector,
-                                                                    null));
-                  log.info("backup announced");
-               }
-            }
-            catch (Exception e)
-            {
-               log.warn("Unable to announce backup, retrying", e);
-            }
-         }
-      });
-   }
-
+   
    private Transformer instantiateTransformer(final String transformerClassName)
    {
       Transformer transformer = null;
@@ -945,32 +717,89 @@
       return transformer;
    }
 
-   // for testing
-   public void clear()
+
+   private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
    {
-      for (Bridge bridge : bridges.values())
+      if (broadcastGroups.containsKey(config.getName()))
       {
-         try
+         ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
+                                     " deployed. This one will not be deployed.");
+
+         return;
+      }
+
+      InetAddress localAddress = null;
+      if (config.getLocalBindAddress() != null)
+      {
+         localAddress = InetAddress.getByName(config.getLocalBindAddress());
+      }
+
+      InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+      BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
+                                                        config.getName(),
+                                                        localAddress,
+                                                        config.getLocalBindPort(),
+                                                        groupAddress,
+                                                        config.getGroupPort(),
+                                                        !backup);
+
+      for (String connectorInfo : config.getConnectorInfos())
+      {
+         TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
+
+         if (connector == null)
          {
-            bridge.stop();
+            logWarnNoConnector(config.getName(), connectorInfo);
+
+            return;
          }
-         catch (Exception e)
-         {
-            log.warn(e.getMessage(), e);
-         }
+
+         group.addConnector(connector);
       }
-      bridges.clear();
-      for (ClusterConnection clusterConnection : clusterConnections.values())
+
+      ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+                                                                           0L,
+                                                                           config.getBroadcastPeriod(),
+                                                                           MILLISECONDS);
+
+      group.setScheduledFuture(future);
+
+      broadcastGroups.put(config.getName(), group);
+
+      managementService.registerBroadcastGroup(group, config);
+   }
+
+   private void logWarnNoConnector(final String connectorName, final String bgName)
+   {
+      ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
+                                  "'. The broadcast group with name '" +
+                                  bgName +
+                                  "' will not be deployed.");
+   }
+
+   private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
+   {
+      TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+                                                                                       connectorNames.size());
+      int count = 0;
+      for (String connectorName : connectorNames)
       {
-         try
+         TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
+
+         if (connector == null)
          {
-            clusterConnection.stop();
+            ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
+                                        "'. The bridge will not be deployed.");
+
+            return null;
          }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
+
+         tcConfigs[count++] = connector;
       }
-      clusterConnections.clear();
+
+      return tcConfigs;
    }
+
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,7 +14,6 @@
 package org.hornetq.core.server.cluster.impl;
 
 import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.server.cluster.ClusterManager;
 
 /**
@@ -30,8 +29,6 @@
    
    void removeClusterLocator(ServerLocatorInternal locator);
    
-   TopologyMember getLocalMember();
-   
    String getNodeId();
 
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -227,7 +227,12 @@
    
    // Used to identify the server on tests... useful on debugging testcases
    private String identity;
+   
+   private Thread backupActivationThread;
 
+   private Activation activation;
+
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -289,11 +294,6 @@
    // lifecycle methods
    // ----------------------------------------------------------------
 
-   private interface Activation extends Runnable
-   {
-      void close(boolean permanently) throws Exception;
-   }
-
    /*
     * Can be overridden for tests
     */
@@ -309,259 +309,6 @@
       }
    }
 
-   private class NoSharedStoreLiveActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            initialisePart1();
-
-            initialisePart2();
-
-            if (identity != null)
-            {
-               log.info("Server " + identity + " is now live");
-            }
-            else
-            {
-               log.info("Server is now live");
-            }
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-
-      }
-   }
-
-   private class SharedStoreLiveActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            log.info("Waiting to obtain live lock");
-
-            checkJournalDirectory();
-
-            initialisePart1();
-
-            if(nodeManager.isBackupLive())
-            {
-               //looks like we've failed over at some point need to inform that we are the backup so when the current live
-               // goes down they failover to us
-               clusterManager.announceBackup();
-               Thread.sleep(configuration.getFailbackDelay());
-            }
-
-            nodeManager.startLiveNode();
-
-            if (stopped)
-            {
-               return;
-            }
-            
-            initialisePart2();
-            
-            log.info("Server is now live");
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         if(permanently)
-         {
-            nodeManager.crashLiveServer();
-         }
-         else
-         {
-            nodeManager.pauseLiveServer();
-         }
-      }
-   }
-
-
-   private class SharedStoreBackupActivation implements Activation
-   {
-      
-      volatile boolean closed = false;
-      public void run()
-      {
-         try
-         {
-            nodeManager.startBackup();
-
-            initialisePart1();
-
-            clusterManager.start();
-
-            started = true;
-
-            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
-
-            nodeManager.awaitLiveNode();
-            
-            configuration.setBackup(false);
-            
-            if (stopped)
-            {
-               return;
-            }
-            
-            initialisePart2();
-            
-            clusterManager.activate();
-
-            log.info("Backup Server is now live");
-
-            nodeManager.releaseBackup();
-            if(configuration.isAllowAutoFailBack())
-            {
-               class FailbackChecker implements Runnable
-               {
-                  boolean restarting = false;
-                  public void run()
-                  {
-                     try
-                     {
-                        if(!restarting && nodeManager.isAwaitingFailback())
-                        {
-                           log.info("live server wants to restart, restarting server in backup");
-                           restarting = true;
-                           Thread t = new Thread(new Runnable()
-                           {
-                              public void run()
-                              {
-                                 try
-                                 {
-                                    log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
-                                    stop(true);
-                                    // We need to wait some time before we start the backup again
-                                    // otherwise we may eventually start before the live had a chance to get it
-                                    Thread.sleep(configuration.getFailbackDelay());
-                                    configuration.setBackup(true);
-                                    log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
-                                    start();
-                                 }
-                                 catch (Exception e)
-                                 {
-                                    log.warn("unable to restart server, please kill and restart manually", e);
-                                 }
-                              }
-                           });
-                           t.start();
-                        }
-                     }
-                     catch (Exception e)
-                     {
-                        log.debug(e.getMessage(), e);
-                        //hopefully it will work next call
-                     }
-                  }
-               }
-               scheduledPool.scheduleAtFixedRate(new FailbackChecker(),  1000l, 1000l, TimeUnit.MILLISECONDS);
-            }
-         }
-         catch (InterruptedException e)
-         {
-            //this is ok, we are being stopped
-         }
-         catch (ClosedChannelException e)
-         {
-            //this is ok too, we are being stopped
-         }
-         catch (Exception e)
-         {
-            if(!(e.getCause() instanceof InterruptedException))
-            {
-               log.error("Failure in initialisation", e);
-            }
-         }
-         catch(Throwable e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         if (configuration.isBackup())
-         {
-            long timeout = 30000;
-
-            long start = System.currentTimeMillis();
-
-            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
-            {
-               nodeManager.interrupt();
-
-               backupActivationThread.interrupt();
-               
-               backupActivationThread.join(1000);
-
-            }
-
-            if (System.currentTimeMillis() - start >= timeout)
-            {
-               threadDump("Timed out waiting for backup activation to exit");
-            }
-
-            nodeManager.stopBackup();
-         }
-         else
-         {
-            //if we are now live, behave as live
-            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
-            // started before the live
-            if(permanently)
-            {
-               nodeManager.crashLiveServer();
-            }
-            else
-            {
-               nodeManager.pauseLiveServer();
-            }
-         }
-      }
-   }
-
-   private class SharedNothingBackupActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            // TODO
-
-            // Try-Connect to live server using live-connector-ref
-
-            // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-      }
-   }
-
-   private Thread backupActivationThread;
-
-   private Activation activation;
-
    public synchronized void start() throws Exception
    {
       stopped = false;
@@ -611,6 +358,7 @@
       }
 
 
+      // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
       if (configuration.isBackup())
       {
          if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
       return new HashSet<ServerSession>(sessions.values());
    }
 
-   // TODO - should this really be here?? It's only used in tests
    public boolean isInitialised()
    {
       synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
       return connectorsService;
    }
 
-   // Public
-   // ---------------------------------------------------------------------------------------
+    
+   public synchronized boolean checkActivate() throws Exception
+   {
+      if (configuration.isBackup())
+      {
+         // Handle backup server activation
 
+         if (!configuration.isSharedStore())
+         {
+            if (replicationEndpoint == null)
+            {
+               HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
+
+               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
+            }
+
+            replicationEndpoint.stop();
+         }
+
+         // Complete the startup procedure
+
+         HornetQServerImpl.log.info("Activating backup server");
+
+         configuration.setBackup(false);
+
+         initialisePart2();
+      }
+
+      return true;
+   }
+
+   public void deployDivert(DivertConfiguration config) throws Exception
+   {
+      if (config.getName() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      if (config.getAddress() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      if (config.getForwardingAddress() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      SimpleString sName = new SimpleString(config.getName());
+
+      if (postOffice.getBinding(sName) != null)
+      {
+         HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
+
+         return;
+      }
+
+      SimpleString sAddress = new SimpleString(config.getAddress());
+
+      Transformer transformer = instantiateTransformer(config.getTransformerClassName());
+
+      Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+                                     sName,
+                                     new SimpleString(config.getRoutingName()),
+                                     config.isExclusive(),
+                                     filter,
+                                     transformer,
+                                     postOffice,
+                                     storageManager);
+
+      Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
+
+      postOffice.addBinding(binding);
+
+      managementService.registerDivert(divert, config);
+   }
+   
+   public void destroyDivert(SimpleString name) throws Exception
+   {
+      Binding binding = postOffice.getBinding(name);
+      if (binding == null)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
+      }
+      if (!(binding instanceof DivertBinding))
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
+      }
+
+      postOffice.removeBinding(name);
+   }
+
+
+
+   public void deployBridge(BridgeConfiguration config) throws Exception
+   {
+      if (clusterManager != null)
+      {
+         clusterManager.deployBridge(config, true);
+      }
+   }
+   
+   public void destroyBridge(String name) throws Exception
+   {
+      if (clusterManager != null)
+      {
+         clusterManager.destroyBridge(name);
+      }
+   }
+   
+   public ServerSession getSessionByID(String sessionName)
+   {
+      return sessions.get(sessionName);
+   }
+   
+   // PUBLIC -------
+   
+   public String toString()
+   {
+      if (identity != null)
+      {
+         return "HornetQServerImpl::" + identity;
+      }
+      else
+      {
+         return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+      }
+   }
+
+
+
    // Package protected
    // ----------------------------------------------------------------------------
 
@@ -1296,34 +1179,6 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   // private boolean startReplication() throws Exception
-   // {
-   // String backupConnectorName = configuration.getBackupConnectorName();
-   //
-   // if (!configuration.isSharedStore() && backupConnectorName != null)
-   // {
-   // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-   //
-   // if (backupConnector == null)
-   // {
-   // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
-   // "' is not defined in the configuration.");
-   // }
-   // else
-   // {
-   //
-   // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
-   // threadPool,
-   // scheduledPool);
-   //
-   // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
-   // replicationManager.start();
-   // }
-   // }
-   //
-   // return true;
-   // }
-
    private void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
       }
    }
 
-   public synchronized boolean checkActivate() throws Exception
-   {
-      if (configuration.isBackup())
-      {
-         // Handle backup server activation
-
-         if (!configuration.isSharedStore())
-         {
-            if (replicationEndpoint == null)
-            {
-               HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
-
-               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
-            }
-
-            replicationEndpoint.stop();
-         }
-
-         // Complete the startup procedure
-
-         HornetQServerImpl.log.info("Activating backup server");
-
-         configuration.setBackup(false);
-
-         initialisePart2();
-      }
-
-      return true;
-   }
-
-   private class FileActivateRunner implements Runnable
-   {
-      public void run()
-      {
-
-      }
-   }
-
    private void initialiseLogging()
    {
       LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1414,8 +1231,6 @@
 
       managementService = new ManagementServiceImpl(mbeanServer, configuration);
 
-      remotingService = new RemotingServiceImpl(configuration, this, managementService, scheduledPool);
-
       if (configuration.getMemoryMeasureInterval() != -1)
       {
          memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(),
@@ -1470,6 +1285,23 @@
                                       configuration.isPersistIDCache(),
                                       addressSettingsRepository);
 
+      // This can't be created until node id is set
+      clusterManager = new ClusterManagerImpl(executorFactory,
+                                              this,
+                                              postOffice,
+                                              scheduledPool,
+                                              managementService,
+                                              configuration,
+                                              nodeManager.getUUID(),
+                                              configuration.isBackup(),
+                                              configuration.isClustered());
+      
+
+      clusterManager.deploy();
+
+
+      remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
+
       messagingServerControl = managementService.registerServer(postOffice,
                                                                 storageManager,
                                                                 configuration,
@@ -1527,18 +1359,6 @@
       deploySecurityFromConfiguration();
 
       deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-
-      // This can't be created until node id is set
-      clusterManager = new ClusterManagerImpl(executorFactory,
-                                              this,
-                                              postOffice,
-                                              scheduledPool,
-                                              managementService,
-                                              configuration,
-                                              nodeManager.getUUID(),
-                                              configuration.isBackup(),
-                                              configuration.isClustered());
-
    }
 
    /*
@@ -1604,10 +1424,10 @@
       // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
       // it is activated
 
-      remotingService.start();
-
       clusterManager.start();
 
+      remotingService.start();
+
       initialised = true;
 
    }
@@ -1826,76 +1646,6 @@
       }
    }
 
-   public void deployDivert(DivertConfiguration config) throws Exception
-   {
-      if (config.getName() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      if (config.getAddress() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      if (config.getForwardingAddress() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      SimpleString sName = new SimpleString(config.getName());
-
-      if (postOffice.getBinding(sName) != null)
-      {
-         HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
-
-         return;
-      }
-
-      SimpleString sAddress = new SimpleString(config.getAddress());
-
-      Transformer transformer = instantiateTransformer(config.getTransformerClassName());
-
-      Filter filter = FilterImpl.createFilter(config.getFilterString());
-
-      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
-                                     sName,
-                                     new SimpleString(config.getRoutingName()),
-                                     config.isExclusive(),
-                                     filter,
-                                     transformer,
-                                     postOffice,
-                                     storageManager);
-
-      Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
-
-      postOffice.addBinding(binding);
-
-      managementService.registerDivert(divert, config);
-   }
-   
-   public void destroyDivert(SimpleString name) throws Exception
-   {
-      Binding binding = postOffice.getBinding(name);
-      if (binding == null)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
-      }
-      if (!(binding instanceof DivertBinding))
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
-      }
-
-      postOffice.removeBinding(name);
-   }
-
-
    private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
    {
       if (config != null)
@@ -1922,22 +1672,6 @@
          managementService.addNotificationListener(groupingHandler);
       }
    }
-   
-   public void deployBridge(BridgeConfiguration config) throws Exception
-   {
-      if (clusterManager != null)
-      {
-         clusterManager.deployBridge(config);
-      }
-   }
-   
-   public void destroyBridge(String name) throws Exception
-   {
-      if (clusterManager != null)
-      {
-         clusterManager.destroyBridge(name);
-      }
-   }
 
    private Transformer instantiateTransformer(final String transformerClassName)
    {
@@ -1979,11 +1713,6 @@
 
    }
 
-   public ServerSession getSessionByID(String sessionName)
-   {
-      return sessions.get(sessionName);
-   }
-   
    /**
     * Check if journal directory exists or create it (if configured to do so)
     */
@@ -2005,18 +1734,284 @@
       }
    }
    
-   public String toString()
+   /**
+    * To be called by backup trying to fail back the server
+    */
+   private void startFailbackChecker()
    {
-      if (identity != null)
+      scheduledPool.scheduleAtFixedRate(new FailbackChecker(),  1000l, 1000l, TimeUnit.MILLISECONDS);
+   }
+
+
+   // Inner classes
+   // --------------------------------------------------------------------------------
+   
+   class FailbackChecker implements Runnable
+   {
+      boolean restarting = false;
+      public void run()
       {
-      	return "HornetQServerImpl::" + identity;
+         try
+         {
+            if(!restarting && nodeManager.isAwaitingFailback())
+            {
+               log.info("live server wants to restart, restarting server in backup");
+               restarting = true;
+               Thread t = new Thread(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
+                        stop(true);
+                        // We need to wait some time before we start the backup again
+                        // otherwise we may eventually start before the live had a chance to get it
+                        Thread.sleep(configuration.getFailbackDelay());
+                        configuration.setBackup(true);
+                        log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
+                        start();
+                     }
+                     catch (Exception e)
+                     {
+                        log.warn("unable to restart server, please kill and restart manually", e);
+                     }
+                  }
+               });
+               t.start();
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
       }
-      else
+   }
+
+   
+
+   private class SharedStoreLiveActivation implements Activation
+   {
+      public void run()
       {
-      	return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+         try
+         {
+            log.info("Waiting to obtain live lock");
+
+            checkJournalDirectory();
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("First part initialization on " + this);
+            }
+
+            initialisePart1();
+
+            if(nodeManager.isBackupLive())
+            {
+               //looks like we've failed over at some point need to inform that we are the backup so when the current live
+               // goes down they failover to us
+               if (log.isDebugEnabled())
+               {
+                  log.debug("announcing backup to the former live" + this);
+               }
+
+               clusterManager.announceBackup();
+               Thread.sleep(configuration.getFailbackDelay());
+            }
+
+            nodeManager.startLiveNode();
+
+            if (stopped)
+            {
+               return;
+            }
+            
+            initialisePart2();
+            
+            log.info("Server is now live");
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
       }
+
+      public void close(boolean permanently) throws Exception
+      {
+         if(permanently)
+         {
+            nodeManager.crashLiveServer();
+         }
+         else
+         {
+            nodeManager.pauseLiveServer();
+         }
+      }
    }
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
+
+   private class SharedStoreBackupActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            nodeManager.startBackup();
+
+            initialisePart1();
+            
+            clusterManager.start();
+
+            started = true;
+
+            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
+
+            nodeManager.awaitLiveNode();
+            
+            configuration.setBackup(false);
+            
+            if (stopped)
+            {
+               return;
+            }
+            
+            initialisePart2();
+            
+            clusterManager.activate();
+
+            log.info("Backup Server is now live");
+
+            nodeManager.releaseBackup();
+            if(configuration.isAllowAutoFailBack())
+            {
+               startFailbackChecker();
+            }
+         }
+         catch (InterruptedException e)
+         {
+            //this is ok, we are being stopped
+         }
+         catch (ClosedChannelException e)
+         {
+            //this is ok too, we are being stopped
+         }
+         catch (Exception e)
+         {
+            if(!(e.getCause() instanceof InterruptedException))
+            {
+               log.error("Failure in initialisation", e);
+            }
+         }
+         catch(Throwable e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      /**
+       * 
+       */
+      public void close(boolean permanently) throws Exception
+      {
+         if (configuration.isBackup())
+         {
+            long timeout = 30000;
+
+            long start = System.currentTimeMillis();
+
+            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+            {
+               nodeManager.interrupt();
+
+               backupActivationThread.interrupt();
+               
+               backupActivationThread.join(1000);
+
+            }
+
+            if (System.currentTimeMillis() - start >= timeout)
+            {
+               threadDump("Timed out waiting for backup activation to exit");
+            }
+
+            nodeManager.stopBackup();
+         }
+         else
+         {
+            //if we are now live, behave as live
+            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+            // started before the live
+            if(permanently)
+            {
+               nodeManager.crashLiveServer();
+            }
+            else
+            {
+               nodeManager.pauseLiveServer();
+            }
+         }
+      }
+   }
+   
+   private interface Activation extends Runnable
+   {
+      void close(boolean permanently) throws Exception;
+   }
+
+   private class SharedNothingBackupActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            // TODO
+
+            // Try-Connect to live server using live-connector-ref
+
+            // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      public void close(boolean permanently) throws Exception
+      {
+      }
+   }
+
+   private class NoSharedStoreLiveActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            initialisePart1();
+
+            initialisePart2();
+
+            if (identity != null)
+            {
+               log.info("Server " + identity + " is now live");
+            }
+            else
+            {
+               log.info("Server is now live");
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      public void close(boolean permanently) throws Exception
+      {
+
+      }
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -1062,7 +1062,7 @@
 
          return;
       }
-
+      
       consumer.receiveCredits(credits);
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
 package org.hornetq.spi.core.protocol;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.hornetq.spi.core.remoting.Connection;
 
@@ -26,7 +27,7 @@
  */
 public interface ProtocolManager extends BufferDecoder
 {
-   ConnectionEntry createConnectionEntry(Connection connection);
+   ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
    
    public void removeHandler(final String name);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
 package org.hornetq.spi.core.remoting;
 
 import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.management.NotificationService;
 
 /**
@@ -31,6 +32,11 @@
    void pause();
 
    /**
+    * @return the cluster connection associated with this Acceptor
+    */
+   ClusterConnection getClusterConnection();
+
+   /**
     * Set the notification service for this acceptor to use.
     *
     * @param notificationService the notification service

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -18,6 +18,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.core.server.cluster.ClusterConnection;
+
 /**
  * A factory for creating acceptors.
  * <p/>
@@ -40,7 +42,8 @@
     * @param scheduledThreadPool a scheduled thread pool
     * @return an acceptor
     */
-   Acceptor createAcceptor(final Map<String, Object> configuration,
+   Acceptor createAcceptor(ClusterConnection clusterConnection,
+                           final Map<String, Object> configuration,
                            BufferHandler handler,
                            BufferDecoder decoder,
                            ConnectionLifeCycleListener listener,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -23,11 +23,13 @@
 public interface ConnectionLifeCycleListener
 {
    /**
-    * called when a connection is created.
+    * This method is used both by client connector creation and server connection creation through acceptors.
+    * the acceptor will be set to null on client operations
     *
+    * @param The acceptor here will be always null on a client connection created event.
     * @param connection the connection that has been created
     */
-   void connectionCreated(Connection connection, ProtocolType protocol);
+   void connectionCreated(Acceptor acceptor, Connection connection, ProtocolType protocol);
 
    /**
     * called when a connection is destroyed.

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -940,10 +940,10 @@
          for (ClusterConnection cc : clusterManager.getClusterConnections())
          {
             out += cc.describe() + "\n";
+            out += cc.getTopology().describe();
          }
       }
       out += "\n\nfull topology:";
-      out += clusterManager.getTopology().describe();
       return out + br;
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -131,10 +131,6 @@
       
       waitForTopology(servers[0], 2);
       waitForTopology(servers[1], 2);
-      
-      System.out.println(servers[0].getClusterManager().getTopology().describe());
-      
-      System.out.println(servers[1].getClusterManager().getTopology().describe());
 
       setupSessionFactory(0,  isNetty(), true);
       setupSessionFactory(1,  isNetty(), true);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -153,17 +153,6 @@
       waitForTopology(servers[1], 3);
       waitForTopology(servers[2], 3);
       
-      for (int i = 0 ; i < 3; i++)
-      {
-         System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
-      }
-
-      for (int i = 0; i <= 2; i++)
-      {
-         log.info("*************************************\n " + servers[i] +
-                  " topology:\n" +
-                  servers[i].getClusterManager().getTopology().describe());
-      }
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
@@ -196,12 +185,6 @@
 
       startServers(0, 1);
 
-      for (int i = 0; i <= 1; i++)
-      {
-         log.info("*************************************\n " + servers[i] +
-                  " topology:\n" +
-                  servers[i].getClusterManager().getTopology().describe());
-      }
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
 
@@ -266,13 +249,6 @@
 
       for (int i = 0; i <= 4; i++)
       {
-         log.info("*************************************\n " + servers[i] +
-                  " topology:\n" +
-                  servers[i].getClusterManager().getTopology().describe());
-      }
-      
-      for (int i = 0; i <= 4; i++)
-      {
          setupSessionFactory(i, isNetty());
       }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -137,8 +137,6 @@
          startServers(0, 1);
          waitForTopology(servers[0], 2);
 
-         System.out.println(servers[0].getClusterManager().getTopology().describe());
-         System.out.println(servers[1].getClusterManager().getTopology().describe());
          waitForTopology(servers[1], 2);
 
          for (int i = 0; i < 10; i++)
@@ -148,7 +146,6 @@
             log.info("#stop #test #" + i);
             stopServers(1);
 
-            System.out.println(servers[0].getClusterManager().getTopology().describe());
             waitForTopology(servers[0], 1, 2000);
             log.info("#start #test #" + i);
             startServers(1);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
       locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
       locator.setReconnectAttempts(-1);
+      ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+       
       ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
       final CountDownLatch latch = new CountDownLatch(1);
 
       ClientSession session = sendAndConsume(sf, true);
+      
+      System.out.println(locator.getTopology().describe());
 
       MyListener listener = new MyListener(latch);
 
       session.addFailureListener(listener);
+      
+      System.out.println(locator.getTopology().describe());
 
       liveServer.crash();
-
+      
       assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      log.info("backup (nowLive) topology = " + backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+      
+      log.info("Server Crash!!!");
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -97,6 +108,11 @@
 
       producer.send(message);
 
+      verifyMessageOnServer(1, 1);
+
+      System.out.println(locator.getTopology().describe());
+      
+
       session.removeFailureListener(listener);
 
       final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
 
       log.info("******* starting live server back");
       liveServer.start();
+      
+      Thread.sleep(1000);
+      
+      System.out.println("After failback: " + locator.getTopology().describe());
 
       assertTrue(latch2.await(5, TimeUnit.SECONDS));
 
@@ -118,6 +138,8 @@
 
       session.close();
 
+      verifyMessageOnServer(0, 1);
+
       sf.close();
 
       Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   /**
+    * @throws Exception
+    * @throws HornetQException
+    */
+   private void verifyMessageOnServer(final int server, final int numberOfMessages) throws Exception, HornetQException
+   {
+      ServerLocator backupLocator = createInVMLocator(server);
+      ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+      ClientSession sessionbkp = factorybkp.createSession(false, false);
+      sessionbkp.start();
+      ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+      for (int i = 0 ; i < numberOfMessages; i++)
+      {
+         ClientMessage msg = consumerbkp.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         sessionbkp.commit();
+      }
+      sessionbkp.close();
+      factorybkp.close();
+      backupLocator.close();
+   }
+
    public void testAutoFailbackThenFailover() throws Exception
    {
       locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
 
       if (createQueue)
       {
-         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
       }
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
       }
 
       ClientMessage message3 = consumer.receiveImmediate();
+      
+      consumer.close();
 
       Assert.assertNull(message3);
 
@@ -315,6 +362,7 @@
 
       public void connectionFailed(final HornetQException me, boolean failedOver)
       {
+         System.out.println("Failed, me");
          latch.countDown();
       }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -12,16 +12,9 @@
  */
 package org.hornetq.tests.integration.cluster.failover;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
 
 /**
@@ -167,6 +160,7 @@
          closeSessionFactory(0);
 
          Thread.sleep(1000);
+         
          servers[0].stop(true);
 
          waitForServerRestart(2);
@@ -213,16 +207,4 @@
 
    abstract boolean isSharedServer();
 
-   private void fail(final RemotingConnection conn, final CountDownLatch latch) throws InterruptedException
-   {
-      // Simulate failure on connection
-      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
-      // Wait to be informed of failure
-
-      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
-      Assert.assertTrue(ok);
-   }
-
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -193,7 +193,7 @@
       {
          if (server != null)
          {
-            log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+            log.info("failed topology, Topology on server = " + server.getClusterManager().describe());
          }
       }
       assertTrue("expected " + topologyMembers + " members", ok);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -98,8 +98,6 @@
       Thread.sleep(500);
       servers.get(0).crash(session);
 
-      System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
-
       int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
 
       ServerLocator locator2 = getServerLocator(3);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -29,6 +29,7 @@
 import org.hornetq.core.remoting.impl.netty.NettyConnector;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -544,7 +545,7 @@
          latch = connCreatedLatch;
       }
 
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
          this.connection = connection;
          if (latch != null)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -89,7 +89,7 @@
        */
       public MockConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
       {
-         super(serverID, handler, listener, Executors.newSingleThreadExecutor());
+         super(null, serverID, handler, listener, Executors.newSingleThreadExecutor());
       }
 
       @Override

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -63,7 +63,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection, final ProtocolType protocol)
+         public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
          {
          }
 
@@ -74,7 +74,8 @@
          
       };
 
-      Acceptor acceptor = factory.createAcceptor(params,
+      Acceptor acceptor = factory.createAcceptor(null,
+                                                 params,
                                                  handler,
                                                  null,
                                                  listener,

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
 import org.hornetq.core.remoting.impl.netty.NettyAcceptor;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -80,7 +81,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection, final ProtocolType protocol)
+         public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
          {
          }
          

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -24,6 +24,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.remoting.impl.netty.NettyConnection;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.tests.util.RandomUtil;
@@ -220,7 +221,7 @@
    class MyListener implements ConnectionLifeCycleListener
    {
 
-      public void connectionCreated(final Connection connection, final ProtocolType protocol)
+      public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
       {
 
       }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -23,6 +23,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.remoting.impl.netty.NettyConnector;
 import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -66,7 +67,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection, final ProtocolType protocol)
+         public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
          {
          }
          public void connectionReadyForWrites(Object connectionID, boolean ready)
@@ -106,7 +107,7 @@
          {
          }
 
-         public void connectionCreated(final Connection connection, final ProtocolType protocol)
+         public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
          {
          }
          

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-20 00:50:32 UTC (rev 11371)
@@ -19,6 +19,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.management.MBeanServer;
 
@@ -37,11 +38,13 @@
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
 import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -112,8 +115,15 @@
       log.debug("waiting for " + nodes + " on the topology for server = " + server);
 
       long start = System.currentTimeMillis();
+      
+      Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
+      
+      if (ccs.size() != 1)
+      {
+         throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
+      }
 
-      Topology topology = server.getClusterManager().getTopology();
+      Topology topology = ccs.iterator().next().getTopology();
 
       do
       {
@@ -521,7 +531,19 @@
       locators.add(locatorWithoutHA);
       return locatorWithoutHA;
    }
+   
+   protected ServerLocator createInVMLocator(final int serverID)
+   {
+      Map<String, Object> server1Params = new HashMap<String, Object>();
 
+      if (serverID != 0)
+      {
+         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+      }
+
+      return HornetQClient.createServerLocatorWithHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+   }
+ 
    protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
    {
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));



More information about the hornetq-commits mailing list