[hornetq-commits] JBoss hornetq SVN: r11234 - in branches/Branch_2_2_EAP_cluster_clean3: src/main/org/hornetq/api/core/client and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 29 12:20:37 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-29 12:20:36 -0400 (Mon, 29 Aug 2011)
New Revision: 11234

Added:
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
Modified:
   branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
Changes I'm planning

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/config/common/hornetq-version.properties	2011-08-29 16:20:36 UTC (rev 11234)
@@ -2,8 +2,8 @@
 hornetq.version.majorVersion=2
 hornetq.version.minorVersion=2
 hornetq.version.microVersion=8
-hornetq.version.incrementingVersion=121
+hornetq.version.incrementingVersion=122
 hornetq.version.versionSuffix=CR1
 hornetq.version.versionTag=CR1
 hornetq.netty.version=@NETTY.VERSION@
-hornetq.version.compatibleVersionList=121
+hornetq.version.compatibleVersionList=121,122

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -26,7 +26,7 @@
  */
 public interface ClusterTopologyListener
 {
-   void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
    
-   void nodeDown(String nodeID);
+   void nodeDown(long eventUID, String nodeID);
 }

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -43,12 +43,13 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
-import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -1291,9 +1292,13 @@
                ClientSessionFactoryImpl.log.trace(this + "::Subscribing Topology");
             }
 
-            channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
+            channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
+            
+            
             if (serverLocator.isClusterConnection())
             {
+               
+               new Exception("Announcing node::").printStackTrace();
                TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
                if (ClientSessionFactoryImpl.isDebug)
                {
@@ -1301,7 +1306,8 @@
                                                      ", isBackup=" +
                                                      serverLocator.isBackup());
                }
-               channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+               sendNodeAnnounce(System.currentTimeMillis(), serverLocator.getNodeID(), serverLocator.isBackup(), config, null);
+               //channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
             }
          }
       }
@@ -1314,6 +1320,23 @@
       return connection;
    }
 
+   /**
+    * @param channel0
+    */
+   public void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig)
+   {
+      Channel channel0 = connection.getChannel(0, -1);
+      if (ClientSessionFactoryImpl.isDebug)
+      {
+         ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
+                                            ", isBackup=" + isBackup);
+      }
+      ClientSessionFactoryImpl.log.info("YYY Announcing node " + serverLocator.getNodeID() + ", config=" + config +
+                                        ", backup=" + backupConfig +
+                                         ", isBackup=" + isBackup);
+      channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config, backupConfig));
+   }
+
    @Override
    public void finalize() throws Throwable
    {
@@ -1439,7 +1462,7 @@
 
             if (nodeID != null)
             {
-               serverLocator.notifyNodeDown(msg.getNodeID().toString());
+               serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
             }
 
             closeExecutor.execute(new Runnable()
@@ -1456,6 +1479,8 @@
          }
          else if (type == PacketImpl.CLUSTER_TOPOLOGY)
          {
+            log.warn("Server is sending packets from an older version. " +
+                     "You must update all the servers to the same version on a cluster!");
             ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage)packet;
 
             if (topMessage.isExit())
@@ -1464,7 +1489,7 @@
                {
                   ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
                }
-               serverLocator.notifyNodeDown(topMessage.getNodeID());
+               serverLocator.notifyNodeDown(System.currentTimeMillis(), topMessage.getNodeID());
             }
             else
             {
@@ -1478,9 +1503,36 @@
                                                      " csf created at\nserverLocator=" +
                                                      serverLocator, e);
                }
-               serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+               serverLocator.notifyNodeUp(System.currentTimeMillis(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
             }
          }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+         {
+            ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2)packet;
+
+            if (topMessage.isExit())
+            {
+               if (ClientSessionFactoryImpl.isDebug)
+               {
+                  ClientSessionFactoryImpl.log.debug("Notifying " + topMessage.getNodeID() + " going down");
+               }
+               serverLocator.notifyNodeDown(topMessage.getUniqueEventID(), topMessage.getNodeID());
+            }
+            else
+            {
+               if (ClientSessionFactoryImpl.isDebug)
+               {
+                  ClientSessionFactoryImpl.log.debug("Node " + topMessage.getNodeID() +
+                                                     " going up, connector = " +
+                                                     topMessage.getPair() +
+                                                     ", isLast=" +
+                                                     topMessage.isLast() +
+                                                     " csf created at\nserverLocator=" +
+                                                     serverLocator, e);
+               }
+               serverLocator.notifyNodeUp(topMessage.getUniqueEventID(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+            }
+         }
       }
    }
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -42,6 +42,8 @@
    void removeSession(final ClientSessionInternal session, boolean failingOver);
 
    void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
+   
+   void sendNodeAnnounce(final long currentEventID, String nodeID, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig);
 
    TransportConfiguration getConnectorConfiguration();
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -483,7 +483,9 @@
     * @param discoveryAddress
     * @param discoveryPort
     */
-   public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+   public ServerLocatorImpl(final Topology topology,
+                            final boolean useHA,
+                            final DiscoveryGroupConfiguration groupConfiguration)
    {
       this(topology, useHA, groupConfiguration, null);
 
@@ -526,25 +528,12 @@
       initialise();
 
       this.startExecutor = executor;
-
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               connect();
-            }
-            catch (Exception e)
-            {
-               if (!closing)
-               {
-                  log.warn("did not connect the cluster connection to other nodes", e);
-               }
-            }
-         }
-      });
    }
+   
+   public Executor getExecutor()
+   {
+      return startExecutor;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
@@ -554,7 +543,7 @@
       finalizeCheck = false;
    }
 
-   public ClientSessionFactory connect() throws Exception
+   public ClientSessionFactoryInternal connect() throws Exception
    {
       ClientSessionFactoryInternal sf;
       // static list of initial connectors
@@ -1244,7 +1233,10 @@
       closed = true;
    }
 
-   public void notifyNodeDown(final String nodeID)
+   /** This is directly called when the connection to the node is gone,
+    *  or when the node sends a disconnection.
+    *  Look for callers of this method! */
+   public void notifyNodeDown(final long eventTime, final String nodeID)
    {
 
       if (topology == null)
@@ -1258,27 +1250,31 @@
          log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
       }
 
-      topology.removeMember(nodeID);
-
-      if (!topology.isEmpty())
+      if (topology.removeMember(eventTime, nodeID))
       {
-         updateArraysAndPairs();
-
-         if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+         if (topology.isEmpty())
          {
+            // Resetting the topology to its original condition as it was brand new
+            topologyArray = null;
+
             receivedTopology = false;
          }
-      }
-      else
-      {
-         topologyArray = null;
+         else
+         {
+            updateArraysAndPairs();
 
-         receivedTopology = false;
+            if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+            {
+               // Resetting the topology to its original condition as it was brand new
+               receivedTopology = false;
+            }
+         }
       }
 
    }
 
-   public void notifyNodeUp(final String nodeID,
+   public void notifyNodeUp(long uniqueEventID,
+                            final String nodeID,
                             final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             final boolean last)
    {
@@ -1293,33 +1289,33 @@
          log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
       }
 
-      topology.addMember(nodeID, new TopologyMember(connectorPair), last);
+      TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
 
-      TopologyMember actMember = topology.getMember(nodeID);
-
-      if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
+      if (topology.updateMember(uniqueEventID, nodeID, member))
       {
-         for (ClientSessionFactory factory : factories)
+
+         TopologyMember actMember = topology.getMember(nodeID);
+
+         if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
          {
-            ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
-                                                                       actMember.getConnector().b);
+            for (ClientSessionFactory factory : factories)
+            {
+               ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+                                                                          actMember.getConnector().b);
+            }
          }
-      }
 
-      if (connectorPair.a != null)
-      {
          updateArraysAndPairs();
       }
 
-      synchronized (this)
+      if (last)
       {
-         if (last)
+         synchronized (this)
          {
             receivedTopology = true;
+            // Notify if waiting on getting topology
+            notifyAll();
          }
-
-         // Notify if waiting on getting topology
-         notifyAll();
       }
    }
 
@@ -1569,9 +1565,9 @@
                                                                                 threadPool,
                                                                                 scheduledThreadPool,
                                                                                 interceptors);
-            
+
             factory.disableFinalizeCheck();
-            
+
             connectors.add(new Connector(initialConnector, factory));
          }
       }

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -31,6 +31,8 @@
 {
    void start(Executor executor) throws Exception;
    
+   Executor getExecutor();
+   
    void factoryClosed(final ClientSessionFactory factory);
    
    /** Used to better identify Cluster Connection Locators on logs while debugging logs */
@@ -42,11 +44,16 @@
    
    void cleanup();
 
-   ClientSessionFactory connect() throws  Exception;
+   ClientSessionFactoryInternal connect() throws  Exception;
 
-   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   void notifyNodeUp(long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
 
-   void notifyNodeDown(String nodeID);
+   /**
+    * 
+    * @param uniqueEventID 0 means get the previous ID +1
+    * @param nodeID
+    */
+   void notifyNodeDown(long uniqueEventID, String nodeID);
 
    void setClusterConnection(boolean clusterConnection);
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -64,6 +64,8 @@
     * values are a pair of live/backup transport configurations
     */
    private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
+   
+   private final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
 
    public Topology(final Object owner)
    {
@@ -104,22 +106,83 @@
       }
    }
 
-   public boolean addMember(final String nodeId, final TopologyMember memberInput, final boolean last)
+   /** This is called by the server when the node is activated from backup state. It will always succeed */
+   public void updateAsLive(final String nodeId, final TopologyMember memberInput)
    {
       synchronized (this)
       {
+         if (log.isDebugEnabled())
+         {
+            log.info(this + "::Live node " + nodeId + "=" + memberInput);
+         }
+         memberInput.setUniqueEventID(System.currentTimeMillis());
+         mapTopology.remove(nodeId);
+         mapTopology.put(nodeId, memberInput);
+         sendMemberUp(memberInput.getUniqueEventID(), nodeId, memberInput);
+      }
+   }
+
+   /** This is called by the server when the node is activated from backup state. It will always succeed */
+   public TopologyMember updateBackup(final String nodeId, final TopologyMember memberInput)
+   {
+      if (log.isTraceEnabled())
+      {
+         log.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
+      }
+
+      synchronized (this)
+      {
+         // TODO treat versioning here. it should remove any previous version
+         // However, if the previous version has a higher time (say if the node time where the system died), we should
+         // use that number ++
+
+         TopologyMember currentMember = getMember(nodeId);
+         if (currentMember == null)
+         {
+            log.warn("There's no live to be updated on backup update", new Exception("trace"));
+         }
+
+         TopologyMember newMember = new TopologyMember(currentMember.getConnector().a, memberInput.getConnector().b);
+         newMember.setUniqueEventID(System.currentTimeMillis());
+         mapTopology.remove(nodeId);
+         mapTopology.put(nodeId, newMember);
+         sendMemberUp(newMember.getUniqueEventID(), nodeId, newMember);
+
+         return newMember;
+      }
+   }
+
+   /**
+    * 
+    * @param <p>uniqueIdentifier an unique identifier for when the change was made
+    *           We will use current time millis for starts, and a ++ of that number for shutdown. </p> 
+    * @param nodeId
+    * @param memberInput
+    * @return
+    */
+   public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
+   {
+      Long deleteTme = mapDelete.get(nodeId);
+      if (deleteTme != null && uniqueEventID < deleteTme)
+      {
+         return false;
+      }
+      
+      if (log.isTraceEnabled())
+      {
+      //   log.trace(this + "::UpdateMember::" + uniqueEventID + ", nodeID=" + nodeId + ", memberInput=" + memberInput);
+      }
+
+      synchronized (this)
+      {
          TopologyMember currentMember = mapTopology.get(nodeId);
 
          if (currentMember == null)
          {
-            if (!testBackof(nodeId))
-            {
-               return false;
-            }
-
             if (Topology.log.isDebugEnabled())
             {
-               Topology.log.debug(this + "::NewMemeberAdd " + this +
+               Topology.log.debug(this + "::NewMemeberAdd " +
+                                  this +
                                   " MEMBER WAS NULL, Add member nodeId=" +
                                   nodeId +
                                   " member = " +
@@ -127,70 +190,42 @@
                                   " size = " +
                                   mapTopology.size(), new Exception("trace"));
             }
+            memberInput.setUniqueEventID(uniqueEventID);
             mapTopology.put(nodeId, memberInput);
-            sendMemberUp(nodeId, memberInput);
+            sendMemberUp(uniqueEventID, nodeId, memberInput);
             return true;
          }
          else
          {
-            if (log.isTraceEnabled())
+            if (uniqueEventID > currentMember.getUniqueEventID())
             {
-               log.trace(this + ":: validating update for currentMember=" + currentMember + " of memberInput=" + memberInput);
-            }
+               log.info(this + "::updated currentMember=nodeID=" + nodeId  +
+                         currentMember +
+                         " of memberInput=" +
+                         memberInput, new Exception ("trace"));
 
-            boolean replaced = false;
-            TopologyMember memberToSend = currentMember;
+               TopologyMember newMember = new TopologyMember(memberInput.getConnector().a, memberInput.getConnector().b);
+               newMember.setUniqueEventID(uniqueEventID);
+               mapTopology.remove(nodeId);
+               mapTopology.put(nodeId, newMember);
+               sendMemberUp(uniqueEventID, nodeId, newMember);
 
-            if (hasChanged("a", memberToSend.getConnector().a, memberInput.getConnector().a))
-            {
-               if (!replaced && !testBackof(nodeId))
-               {
-                  return false;
-               }
-               memberToSend = new TopologyMember(memberInput.getConnector().a, memberToSend.getConnector().b);
-               replaced = true;
+               return true;
             }
-
-            if (hasChanged("b", memberToSend.getConnector().b, memberInput.getConnector().b))
+            else
             {
-               if (!replaced && !testBackof(nodeId))
-               {
-                  return false;
-               }
-               memberToSend = new TopologyMember(memberToSend.getConnector().a, memberInput.getConnector().b);
-               replaced = true;
+               return false;
             }
-
-            if (replaced)
-            {
-               mapTopology.remove(nodeId);
-               mapTopology.put(nodeId, memberToSend);
-
-               sendMemberUp(nodeId, memberToSend);
-               return true;
-            }
-
          }
 
       }
-
-      if (Topology.log.isDebugEnabled())
-      {
-         Topology.log.debug(Topology.this + " Add member nodeId=" +
-                            nodeId +
-                            " member = " +
-                            memberInput +
-                            " has been ignored since there was no change", new Exception("trace"));
-      }
-
-      return false;
    }
 
    /**
     * @param nodeId
     * @param memberToSend
     */
-   private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+   private void sendMemberUp(final long uniqueEventID, final String nodeId, final TopologyMember memberToSend)
    {
       final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
@@ -207,12 +242,17 @@
             {
                if (Topology.log.isTraceEnabled())
                {
-                  Topology.log.trace(Topology.this + " informing " + listener + " about node up = " + nodeId);
+                  Topology.log.trace(Topology.this + " informing " +
+                                     listener +
+                                     " about node up = " +
+                                     nodeId +
+                                     " connector = " +
+                                     memberToSend.getConnector());
                }
 
                try
                {
-                  listener.nodeUP(nodeId, memberToSend.getConnector(), false);
+                  listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(), false);
                }
                catch (Throwable e)
                {
@@ -276,28 +316,26 @@
       return listenersCopy;
    }
 
-   public boolean removeMember(final String nodeId)
+   public boolean removeMember(final long uniqueEventID, final String nodeId)
    {
       TopologyMember member;
 
       synchronized (this)
       {
-         Pair<Long, Integer> value = mapBackof.get(nodeId);
-
-         if (value == null)
+         member = mapTopology.get(nodeId);
+         if (member != null)
          {
-            value = new Pair<Long, Integer>(0l, 0);
-            mapBackof.put(nodeId, value);
+            if (member.getUniqueEventID() > uniqueEventID)
+            {
+               log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+               member = null;
+            }
+            else
+            {
+               mapDelete.put(nodeId, uniqueEventID);
+               member = mapTopology.remove(nodeId);
+            }
          }
-
-         value.a = System.currentTimeMillis();
-
-         if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
-         {
-            value.b = 0;
-         }
-
-         member = mapTopology.remove(nodeId);
       }
 
       if (Topology.log.isDebugEnabled())
@@ -327,7 +365,7 @@
                   }
                   try
                   {
-                     listener.nodeDown(nodeId);
+                     listener.nodeDown(uniqueEventID, nodeId);
                   }
                   catch (Exception e)
                   {
@@ -354,14 +392,13 @@
    }
 
    /**
-    * it will send all the member updates to listeners, independently of being changed or not
+    * it will send the member to its listeners
     * @param nodeID
     * @param member
     */
-   public void sendMemberToListeners(final String nodeID, final TopologyMember member)
+   public void sendMember(final String nodeID)
    {
-      // To make sure it was updated
-      addMember(nodeID, member, false);
+      final TopologyMember member = getMember(nodeID);
 
       final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
@@ -380,7 +417,7 @@
                             " with connector=" +
                             member.getConnector());
                }
-               listener.nodeUP(nodeID, member.getConnector(), false);
+               listener.nodeUP(member.getUniqueEventID(), nodeID, member.getConnector(), false);
             }
          }
       });
@@ -417,18 +454,21 @@
                             " to " +
                             listener);
                }
-               listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
+               listener.nodeUP(entry.getValue().getUniqueEventID(),
+                               entry.getKey(),
+                               entry.getValue().getConnector(),
+                               ++count == copy.size());
             }
          }
       });
    }
 
-   public TopologyMember getMember(final String nodeID)
+   public synchronized TopologyMember getMember(final String nodeID)
    {
       return mapTopology.get(nodeID);
    }
 
-   public boolean isEmpty()
+   public synchronized boolean isEmpty()
    {
       return mapTopology.isEmpty();
    }
@@ -506,8 +546,10 @@
       if (log.isTraceEnabled())
       {
 
-         log.trace(this + "::Validating current=" + a 
-                   + " != input=" + b +
+         log.trace(this + "::Validating current=" +
+                   a +
+                   " != input=" +
+                   b +
                    (changed ? " and it has changed" : " and it didn't change") +
                    ", for validation of " +
                    debugInfo);

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -27,9 +27,13 @@
 
    private final Pair<TransportConfiguration, TransportConfiguration> connector;
 
+   /** transient to avoid serialization changes */
+   private transient long uniqueEventID = System.currentTimeMillis();
+
    public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
    {
       this.connector = connector;
+      this.uniqueEventID = System.currentTimeMillis();
    }
 
    public TopologyMember(TransportConfiguration a, TransportConfiguration b)
@@ -37,11 +41,28 @@
       this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
    }
 
+
+   /**
+    * @return the uniqueEventID
+    */
+   public long getUniqueEventID()
+   {
+      return uniqueEventID;
+   }
+   
+   /**
+    * @param uniqueEventID the uniqueEventID to set
+    */
+   public void setUniqueEventID(long uniqueEventID)
+   {
+      this.uniqueEventID = uniqueEventID;
+   }
+
    public Pair<TransportConfiguration, TransportConfiguration> getConnector()
    {
       return connector;
    }
-   
+
    @Override
    public String toString()
    {

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/Channel.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -36,6 +36,9 @@
     * @return the id
     */
    long getID();
+   
+   /** For protocol check */
+   boolean supports(byte packetID);
 
    /**
     * sends a packet on this channel.

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/CoreRemotingConnection.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -25,6 +25,15 @@
  */
 public interface CoreRemotingConnection extends RemotingConnection
 {  
+
+   /** The client protocol used  on the communication.
+    *  This will determine if the client has support for certain packet types */
+   int getClientVersion();
+   
+   /** The client protocol used  on the communication.
+    *  This will determine if the client has support for certain packet types */
+   void setClientVersion(int clientVersion);
+   
    /**
     * return the channel with the channel id specified.
     * <p/>

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -94,6 +94,19 @@
          resendCache = null;
       }
    }
+   
+   public boolean supports(final byte packetType)
+   {
+      int version = connection.getClientVersion();
+      
+      switch (packetType)
+      {
+         case PacketImpl.CLUSTER_TOPOLOGY_V2:
+            return version >= 122;
+         default:
+            return true;
+      }
+   }
 
    public long getID()
    {

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -32,9 +32,11 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -112,13 +114,19 @@
                // Just send a ping back
                channel0.send(packet);
             }
-            else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
+            else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY || packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
             {
                SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
                
+               if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2)
+               {
+                  channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2)msg).getClientVersion());
+               }
+               
                final ClusterTopologyListener listener = new ClusterTopologyListener()
                {
-                  public void nodeUP(final String nodeID,
+                  public void nodeUP(final long uniqueEventID,
+                                     final String nodeID,
                                      final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                      final boolean last)
                   {
@@ -129,12 +137,19 @@
                      {
                         public void run()
                         {
-                           channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                           if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+                           {
+                              channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID, connectorPair, last));
+                           }
+                           else
+                           {
+                              channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                           }
                         }
                      });
                    }
 
-                  public void nodeDown(final String nodeID)
+                  public void nodeDown(final long uniqueEventID, final String nodeID)
                   {
                      // Using an executor as most of the notifications on the Topology
                      // may come from a channel itself
@@ -143,7 +158,14 @@
                      {
                         public void run()
                         {
-                           channel0.send(new ClusterTopologyChangeMessage(nodeID));
+                           if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+                           {
+                              channel0.send(new ClusterTopologyChangeMessage_V2(uniqueEventID, nodeID));
+                           }
+                           else
+                           {
+                              channel0.send(new ClusterTopologyChangeMessage(nodeID));
+                           }
                         }
                      });
                   }
@@ -177,13 +199,14 @@
                }
                else
                {
-                  pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+                  pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), msg.getBackupConnector());
                }
                if (isTrace)
                {
                   log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
                }
-               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
+               log.info(server + "::YYY Topology Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
+               server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
             }
          }
       });

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -163,6 +163,17 @@
                                        "Server will not accept create session requests");
          }
 
+         if (connection.getClientVersion() == 0)
+         {
+            connection.setClientVersion(request.getVersion());
+         }
+         else if (connection.getClientVersion() != request.getVersion())
+         {
+            log.warn("Client is not being consistent on the request versioning. " +
+            		   "It just sent a version id=" + request.getVersion() + 
+            		   " while it informed " + connection.getClientVersion() + " previously");
+         }
+         
          Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
 
          ServerSession session = server.createSession(request.getName(),

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -14,6 +14,7 @@
 package org.hornetq.core.protocol.core.impl;
 
 import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
@@ -83,11 +84,13 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -152,6 +155,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
 
 /**
  * A PacketDecoder
@@ -504,6 +508,11 @@
             packet = new ClusterTopologyChangeMessage();
             break;
          }
+         case CLUSTER_TOPOLOGY_V2:
+         {
+            packet = new ClusterTopologyChangeMessage_V2();
+            break;
+         }
          case NODE_ANNOUNCE:
          {
             packet = new NodeAnnounceMessage();
@@ -514,6 +523,11 @@
             packet = new SubscribeClusterTopologyUpdatesMessage();
             break;
          }
+         case SUBSCRIBE_TOPOLOGY_V2:
+         {
+            packet = new SubscribeClusterTopologyUpdatesMessageV2();
+            break;
+         }
          case SESS_ADD_METADATA:
          {
             packet = new SessionAddMetaDataMessage();

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -193,7 +193,13 @@
    public static final byte NODE_ANNOUNCE = 111;
 
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
+   
+   // For newer versions
 
+   public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
+
+   public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -72,6 +72,8 @@
    private volatile boolean destroyed;
 
    private final boolean client;
+   
+   private int clientVersion;
 
    // Channels 0-9 are reserved for the system
    // 0 is for pinging
@@ -183,7 +185,23 @@
 
       failureListeners.addAll(listeners);
    }
+   
+   /**
+    * @return the clientVersion
+    */
+   public int getClientVersion()
+   {
+      return clientVersion;
+   }
 
+   /**
+    * @param clientVersion the clientVersion to set
+    */
+   public void setClientVersion(int clientVersion)
+   {
+      this.clientVersion = clientVersion;
+   }
+
    public Object getID()
    {
       return transportConnection.getID();

Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Clebert Suconic
+ *
+ */
+public class ClusterTopologyChangeMessage_V2 extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private boolean exit;
+   
+   private String nodeID;
+   
+   private Pair<TransportConfiguration, TransportConfiguration> pair;
+   
+   private long uniqueEventID;
+
+   private boolean last;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+   {
+      super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+
+      this.nodeID = nodeID;
+      
+      this.pair = pair;
+      
+      this.last = last;
+      
+      this.exit = false;
+      
+      this.uniqueEventID = uniqueEventID;
+   }
+   
+   public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
+   {
+      super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+      
+      this.exit = true;
+      
+      this.nodeID = nodeID;
+      
+      this.uniqueEventID = uniqueEventID;
+   }
+
+   public ClusterTopologyChangeMessage_V2()
+   {
+      super(PacketImpl.CLUSTER_TOPOLOGY_V2);
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public Pair<TransportConfiguration, TransportConfiguration> getPair()
+   {
+      return pair;
+   }
+   
+   public boolean isLast()
+   {
+      return last;
+   }
+   
+   /**
+    * @return the uniqueEventID
+    */
+   public long getUniqueEventID()
+   {
+      return uniqueEventID;
+   }
+ 
+   public boolean isExit()
+   {
+      return exit;
+   }
+   
+   
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(exit);
+      buffer.writeString(nodeID);
+      buffer.writeLong(uniqueEventID);
+      if (!exit)
+      {
+         if (pair.a != null)
+         {
+            buffer.writeBoolean(true);
+            pair.a.encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         if (pair.b != null)
+         {
+            buffer.writeBoolean(true);
+            pair.b.encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         buffer.writeBoolean(last);
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      exit = buffer.readBoolean();
+      nodeID = buffer.readString();
+      uniqueEventID = buffer.readLong();
+      if (!exit)
+      {
+         boolean hasLive = buffer.readBoolean();
+         TransportConfiguration a;
+         if(hasLive)
+         {
+            a = new TransportConfiguration();
+            a.decode(buffer);
+         }
+         else
+         {
+            a = null;
+         }
+         boolean hasBackup = buffer.readBoolean();
+         TransportConfiguration b;
+         if (hasBackup)
+         {
+            b = new TransportConfiguration();
+            b.decode(buffer);
+         }
+         else
+         {
+            b = null;
+         }
+         pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+         last = buffer.readBoolean();
+      }
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -34,21 +34,29 @@
    
    private boolean backup;
    
+   private long currentEventID;
+   
    private TransportConfiguration connector;
 
+   private TransportConfiguration backupConnector;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+   public NodeAnnounceMessage(final long currentEventID, final String nodeID, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
    {
       super(PacketImpl.NODE_ANNOUNCE);
 
+      this.currentEventID = currentEventID;
+      
       this.nodeID = nodeID;
       
       this.backup = backup;
       
       this.connector = tc;
+      
+      this.backupConnector = backupConnector;
    }
    
    public NodeAnnounceMessage()
@@ -74,13 +82,43 @@
       return connector;
    }
    
+   public TransportConfiguration getBackupConnector()
+   {
+      return backupConnector;
+   }
 
+   /**
+    * @return the currentEventID
+    */
+   public long getCurrentEventID()
+   {
+      return currentEventID;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeString(nodeID);
       buffer.writeBoolean(backup);
-      connector.encode(buffer);
+      buffer.writeLong(currentEventID);
+      if (connector != null)
+      {
+         buffer.writeBoolean(true);
+         connector.encode(buffer);
+      }
+      else
+      {
+         buffer.writeBoolean(false);
+      }
+      if (backupConnector != null)
+      {
+         buffer.writeBoolean(true);
+         backupConnector.encode(buffer);
+      }
+      else
+      {
+         buffer.writeBoolean(false);
+      }
    }
 
    @Override
@@ -88,8 +126,17 @@
    {
       this.nodeID = buffer.readString();
       this.backup = buffer.readBoolean();
-      connector = new TransportConfiguration();
-      connector.decode(buffer);
+      this.currentEventID = buffer.readLong();
+      if (buffer.readBoolean())
+      {
+         connector = new TransportConfiguration();
+         connector.decode(buffer);
+      }
+      if (buffer.readBoolean())
+      {
+         backupConnector = new TransportConfiguration();
+         backupConnector.decode(buffer);
+      }
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -42,11 +42,23 @@
       this.clusterConnection = clusterConnection;
    }
 
+   protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection)
+   {
+      super(packetType);
+
+      this.clusterConnection = clusterConnection;
+   }
+
    public SubscribeClusterTopologyUpdatesMessage()
    {
       super(PacketImpl.SUBSCRIBE_TOPOLOGY);
    }
 
+   protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
+   {
+      super(packetType);
+   }
+
    // Public --------------------------------------------------------
 
    public boolean isClusterConnection()

Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   private int clientVersion;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion)
+   {
+      super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
+
+      this.clientVersion = clientVersion;
+   }
+
+   public SubscribeClusterTopologyUpdatesMessageV2()
+   {
+      super(PacketImpl.SUBSCRIBE_TOPOLOGY_V2);
+   }
+
+   // Public --------------------------------------------------------
+
+   
+   
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      super.encodeRest(buffer);
+      buffer.writeInt(clientVersion);
+   }
+
+   /**
+    * @return the clientVersion
+    */
+   public int getClientVersion()
+   {
+      return clientVersion;
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      super.decodeRest(buffer);
+      clientVersion = buffer.readInt();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -48,10 +48,8 @@
    
    void activate();
 
-   void notifyNodeDown(String nodeID);
+   void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
 
-   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
-
    Topology getTopology();
    
    void flushExecutor();

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -101,7 +101,7 @@
 
    private final Transformer transformer;
 
-   private volatile ClientSessionFactory csf;
+   private volatile ClientSessionFactoryInternal csf;
 
    protected volatile ClientSessionInternal session;
 
@@ -200,6 +200,7 @@
    {
       this.notificationService = notificationService;
    }
+   
    public synchronized void start() throws Exception
    {
       if (started)
@@ -647,6 +648,15 @@
       }
    }
 
+   /**
+    * @return
+    */
+   protected ClientSessionFactoryInternal getCurrentFactory()
+   {
+      return csf;
+   }
+
+
    /* Hook for creating session factory */
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -29,6 +29,7 @@
 import org.hornetq.api.core.management.ResourceNames;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -71,6 +72,8 @@
    private final SimpleString idsHeaderName;
 
    private final String targetNodeID;
+   
+   private final long targetNodeEventUID;
 
    private final TransportConfiguration connector;
 
@@ -85,6 +88,7 @@
                                   final double retryMultiplier,
                                   final long maxRetryInterval,
                                   final UUID nodeUUID,
+                                  final long targetNodeEventUID,
                                   final String targetNodeID,
                                   final SimpleString name,
                                   final Queue queue,
@@ -122,6 +126,7 @@
             activated,
             storageManager);
 
+
       this.discoveryLocator = discoveryLocator;
 
       idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
@@ -130,6 +135,7 @@
 
       this.clusterManager = clusterManager;
 
+      this.targetNodeEventUID = targetNodeEventUID;
       this.targetNodeID = targetNodeID;
       this.managementAddress = managementAddress;
       this.managementNotificationAddress = managementNotificationAddress;
@@ -149,6 +155,17 @@
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
       ClientSessionFactoryInternal factory = super.createSessionFactory();
+      
+      try
+      {
+         TopologyMember member = clusterManager.getLocalMember();
+         factory.sendNodeAnnounce(member.getUniqueEventID(), clusterManager.getNodeId(), false, member.getConnector().a, member.getConnector().b);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
       factory.getConnection().addFailureListener(new FailureListener()
       {
 
@@ -161,6 +178,7 @@
             }
          }
       });
+
       return factory;
    }
 
@@ -294,6 +312,7 @@
    protected void afterConnect() throws Exception
    {
       super.afterConnect();
+      ClientSessionFactoryInternal csf = getCurrentFactory();
       setupNotificationConsumer();
    }
 
@@ -319,7 +338,7 @@
       if (permanently)
       {
          log.debug("cluster node for bridge " + this.getName() + " is permanently down");
-         discoveryLocator.notifyNodeDown(targetNodeID);
+         discoveryLocator.notifyNodeDown(targetNodeEventUID+1, targetNodeID);
       }
 
    }

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -36,6 +36,7 @@
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
@@ -470,6 +471,14 @@
          {
             log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
          }
+         
+         final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+         
+         if (currentMember == null)
+         {
+            // sanity check only
+            throw new IllegalStateException ("InternalError! The ClusterConnection doesn't know about its own node = " + this);
+         }
 
          serverLocator.setNodeID(nodeUUID.toString());
          serverLocator.setIdentity("(main-ClusterConnection::" + server.toString() + ")");
@@ -494,6 +503,26 @@
          serverLocator.addClusterTopologyListener(this);
 
          serverLocator.start(server.getExecutorFactory().getExecutor());
+         
+         serverLocator.getExecutor().execute(new Runnable(){
+            public void run()
+            {
+               try
+               {
+                  ClientSessionFactoryInternal csf = serverLocator.connect();
+                  
+                  log.info(this + "::YYY " + nodeUUID.toString() + " Cluster connection " + ClusterConnectionImpl.this + 
+                           " connected, sending announce node, connector=" + 
+                           manager.getLocalMember().getConnector().a + "/" + manager.getLocalMember().getConnector().b);
+                  
+                  csf.sendNodeAnnounce(currentMember.getUniqueEventID(), nodeUUID.toString(), false, manager.getLocalMember().getConnector().a, manager.getLocalMember().getConnector().b);
+               }
+               catch (Exception e)
+               {
+                  log.warn("Error on connectin Cluster connection to other nodes", e);
+               }
+            }
+         });
       }
 
       if (managementService != null)
@@ -515,7 +544,7 @@
 
    // ClusterTopologyListener implementation ------------------------------------------------------------------
 
-   public void nodeDown(final String nodeID)
+   public void nodeDown(final long eventUID, final String nodeID)
    {
       if (log.isDebugEnabled())
       {
@@ -544,12 +573,11 @@
          {
             log.error("Failed to close flow record", e);
          }
-
-         server.getClusterManager().notifyNodeDown(nodeID);
       }
    }
 
-   public void nodeUP(final String nodeID,
+   public void nodeUP(final long eventUID, 
+                      final String nodeID,
                       final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                       final boolean last)
    {
@@ -562,19 +590,13 @@
 
       if (nodeID.equals(nodeUUID.toString()))
       {
-         if (connectorPair.b != null)
-         {
         	if (log.isTraceEnabled())
         	{
         	   log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
         	}
-            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-         }
          return;
       }
 
-      // we propagate the node notifications to all cluster topology listeners
-      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
 
       // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
       if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
@@ -615,6 +637,7 @@
                {
                   log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
                }
+               log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
 
                // New node - create a new flow record
 
@@ -635,7 +658,7 @@
                   queue = server.createQueue(queueName, queueName, null, true, false);
                }
 
-               createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+               createNewRecord(eventUID, nodeID, connectorPair.a, queueName, queue, true);
             }
             else
             {
@@ -656,7 +679,8 @@
       }
    }
 
-   private void createNewRecord(final String targetNodeID,
+   private void createNewRecord(final long eventUID,
+                                final String targetNodeID,
                                 final TransportConfiguration connector,
                                 final SimpleString queueName,
                                 final Queue queue,
@@ -690,7 +714,7 @@
 
       targetLocator.disableFinalizeCheck();
       
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
 
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
                                                                    manager,
@@ -701,6 +725,7 @@
                                                                    retryIntervalMultiplier,
                                                                    maxRetryInterval,
                                                                    nodeUUID,
+                                                                   record.getEventUID(),
                                                                    record.getTargetNodeID(),
                                                                    record.getQueueName(),
                                                                    record.getQueue(),
@@ -742,6 +767,8 @@
    {
       private BridgeImpl bridge;
 
+      private final long eventUID;
+      
       private final String targetNodeID;
 
       private final TransportConfiguration connector;
@@ -761,6 +788,7 @@
       private volatile boolean firstReset = false;
 
       public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+                                   final long eventUID,
                                    final String targetNodeID,
                                    final TransportConfiguration connector,
                                    final SimpleString queueName,
@@ -771,6 +799,7 @@
          this.targetNodeID = targetNodeID;
          this.connector = connector;
          this.queueName = queueName;
+         this.eventUID = eventUID;
       }
 
       /* (non-Javadoc)
@@ -802,6 +831,14 @@
       {
          return address.toString();
       }
+      
+      /**
+       * @return the eventUID
+       */
+      public long getEventUID()
+      {
+         return eventUID;
+      }
 
       /**
        * @return the nodeID
@@ -1034,6 +1071,7 @@
 
       private synchronized void doBindingAdded(final ClientMessage message) throws Exception
       {
+         log.info(ClusterConnectionImpl.this + " Adding binding " + message);
          if (log.isTraceEnabled())
          {
             log.trace(ClusterConnectionImpl.this + " Adding binding " + message);
@@ -1091,7 +1129,7 @@
             // hops is too high
             // or there are multiple cluster connections for the same address
 
-            ClusterConnectionImpl.log.warn("Remote queue binding " + clusterName +
+            ClusterConnectionImpl.log.warn(this + "::Remote queue binding " + clusterName +
                                            " has already been bound in the post office. Most likely cause for this is you have a loop " +
                                            "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -100,6 +100,8 @@
    private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
 
    private final Topology topology = new Topology(this);
+   
+   private TopologyMember localMember;
 
    private volatile ServerLocatorInternal backupServerLocator;
 
@@ -168,6 +170,16 @@
    {
       return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
    }
+   
+   public TopologyMember getLocalMember()
+   {
+      return localMember;
+   }
+   
+   public String getNodeId()
+   {
+      return nodeUUID.toString();
+   }
 
    public synchronized void start() throws Exception
    {
@@ -183,11 +195,41 @@
             deployBroadcastGroup(config);
          }
 
+         String connectorName = null;
+
          for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
          {
-            deployClusterConnection(config);
+            if (connectorName == null)
+            {
+               connectorName = config.getConnectorName();
+            }
+            else if (!connectorName.equals(config.getConnectorName()))
+            {
+               throw new IllegalStateException("Using multiple connector names on cluster connections it's "
+                                               + "not supported at this time");
+            }
+
          }
 
+         if (connectorName != null)
+         {
+            TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
+            if (nodeConnector == null)
+            {
+               log.warn("No connecor with name '" + connectorName +
+                        "'. The cluster connection will not be deployed.");
+               return;
+            }
+   
+            // Now announce presence
+            announceNode(nodeConnector);
+   
+            for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
+            {
+               deployClusterConnection(config);
+            }
+         }
+
       }
 
       for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -195,13 +237,6 @@
          deployBridge(config);
       }
 
-      // Now announce presence
-
-      if (clusterConnections.size() > 0)
-      {
-         announceNode();
-      }
-
       started = true;
    }
 
@@ -264,81 +299,26 @@
       clusterConnections.clear();
    }
 
-   public void notifyNodeDown(String nodeID)
+   public void nodeAnnounced(final long uniqueEventID,
+                             final String nodeID,
+                             final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                             final boolean backup)
    {
-      if (nodeID.equals(nodeUUID.toString()))
-      {
-         return;
-      }
-
-      log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));
-
-      topology.removeMember(nodeID);
-
-   }
-
-   public void notifyNodeUp(final String nodeID,
-                            final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            final boolean last,
-                            final boolean nodeAnnounce)
-   {
       if (log.isDebugEnabled())
       {
-         log.debug(this + "::NodeUp " + nodeID + connectorPair + ", nodeAnnounce=" + nodeAnnounce);
+         log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
       }
 
-      TopologyMember member = new TopologyMember(connectorPair);
-      boolean updated = topology.addMember(nodeID, member, last);
-
-      if (!updated)
+      TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+      newMember.setUniqueEventID(uniqueEventID);
+      if (backup)
       {
-         if (log.isDebugEnabled())
-         {
-            log.debug(this + " ignored notifyNodeUp on nodeID=" +
-                      nodeID +
-                      " pair=" +
-                      connectorPair +
-                      " as the topology already knew about it");
-         }
-         return;
+         topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
       }
-
-      if (log.isDebugEnabled())
+      else
       {
-         log.debug(this + " received notifyNodeUp nodeID=" +
-                   nodeID +
-                   " connectorPair=" +
-                   connectorPair +
-                   ", nodeAnnounce=" +
-                   nodeAnnounce +
-                   ", last=" +
-                   last);
+         topology.updateMember(uniqueEventID, nodeID, newMember);
       }
-
-      // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
-      // connections.
-      if (nodeAnnounce)
-      {
-         if (log.isDebugEnabled())
-         {
-            log.debug("Informing " + nodeID + " to " + clusterConnections.toString());
-         }
-         for (ClusterConnection clusterConnection : clusterConnections.values())
-         {
-            if (log.isTraceEnabled())
-            {
-               log.trace(this + " information clusterConnection=" +
-                         clusterConnection +
-                         " nodeID=" +
-                         nodeID +
-                         " connectorPair=" +
-                         connectorPair +
-                         " last=" +
-                         last);
-            }
-            clusterConnection.nodeUP(nodeID, connectorPair, last);
-         }
-      }
    }
 
    public void flushExecutor()
@@ -347,7 +327,8 @@
       executor.execute(future);
       if (!future.await(10000))
       {
-         server.threadDump("Couldn't flush ClusterManager executor (" + this + ") in 10 seconds, verify your thread pool size");
+         server.threadDump("Couldn't flush ClusterManager executor (" + this +
+                           ") in 10 seconds, verify your thread pool size");
       }
    }
 
@@ -405,9 +386,8 @@
 
          TopologyMember member = topology.getMember(nodeID);
          // swap backup as live and send it to everybody
-         member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
-                                                                                              null));
-         topology.addMember(nodeID, member, false);
+         member = new TopologyMember(member.getConnector().b, null);
+         topology.updateAsLive(nodeID, member);
 
          if (backupServerLocator != null)
          {
@@ -460,7 +440,7 @@
             }
          }
 
-         topology.sendMemberToListeners(nodeID, member);
+         topology.sendMember(nodeID);
       }
    }
 
@@ -496,43 +476,21 @@
       this.clusterLocators.remove(serverLocator);
    }
 
-   private synchronized void announceNode()
+   private synchronized void announceNode(final TransportConfiguration nodeConnector)
    {
-      // TODO does this really work with more than one cluster connection? I think not
-
-      // Just take the first one for now
-      ClusterConnection cc = clusterConnections.values().iterator().next();
-
       String nodeID = server.getNodeID().toString();
-
-      TopologyMember member = topology.getMember(nodeID);
-
-      if (member == null)
+      
+      if (backup)
       {
-         if (backup)
-         {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null,
-                                                                                                 cc.getConnector()));
-         }
-         else
-         {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(),
-                                                                                                 null));
-         }
-
-         topology.addMember(nodeID, member, false);
+         localMember = new TopologyMember(null, nodeConnector);
       }
       else
       {
-         if (backup)
-         {
-            // pair.b = cc.getConnector();
-         }
-         else
-         {
-            // pair.a = cc.getConnector();
-         }
+         localMember = new TopologyMember(nodeConnector, null);
       }
+
+      System.out.println("Adding local member " + localMember);
+      topology.updateAsLive(nodeID, localMember);
    }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
@@ -955,9 +913,14 @@
                ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
                if (backupSessionFactory != null)
                {
+                  System.out.println("announcing " + System.currentTimeMillis());
                   backupSessionFactory.getConnection()
                                       .getChannel(0, -1)
-                                      .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+                                      .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+                                                                    nodeUUID.toString(),
+                                                                    true,
+                                                                    connector,
+                                                                    null));
                   log.info("backup announced");
                }
             }

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -14,6 +14,7 @@
 package org.hornetq.core.server.cluster.impl;
 
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.server.cluster.ClusterManager;
 
 /**
@@ -28,5 +29,9 @@
    void addClusterLocator(ServerLocatorInternal locator);
    
    void removeClusterLocator(ServerLocatorInternal locator);
+   
+   TopologyMember getLocalMember();
+   
+   String getNodeId();
 
 }

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -354,11 +354,10 @@
 
             if(nodeManager.isBackupLive())
             {
-               Thread.sleep(configuration.getFailbackDelay());
                //looks like we've failed over at some point need to inform that we are the backup so when the current live
                // goes down they failover to us
                clusterManager.announceBackup();
-               Thread.sleep(configuration.getFailbackDelay());
+               //Thread.sleep(configuration.getFailbackDelay());
             }
 
             nodeManager.startLiveNode();
@@ -2004,7 +2003,7 @@
    
    public String toString()
    {
-      return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
+      return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", "))/* + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "")*/;
    }
 
    // Inner classes

Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -83,12 +83,12 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-public class ServerSessionImpl implements ServerSession , FailureListener
+public class ServerSessionImpl implements ServerSession, FailureListener
 {
    // Constants -----------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
    // Static -------------------------------------------------------------------------------
@@ -147,14 +147,14 @@
    private volatile SimpleString defaultAddress;
 
    private volatile int timeoutSeconds;
-   
+
    private Map<String, String> metaData;
-   
+
    private OperationContext sessionContext;
 
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
-   private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString,  Pair<UUID, AtomicLong>>();
-   
+   private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+
    private long creationTime = System.currentTimeMillis();
 
    // Constructors ---------------------------------------------------------------------------------
@@ -244,7 +244,6 @@
       this.sessionContext = sessionContext;
    }
 
-
    public String getUsername()
    {
       return username;
@@ -269,8 +268,9 @@
    {
       return remotingConnection.getID();
    }
-   
-   public Set<ServerConsumer> getServerConsumers() {
+
+   public Set<ServerConsumer> getServerConsumers()
+   {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
       return Collections.unmodifiableSet(consumersClone);
    }
@@ -316,7 +316,7 @@
       }
 
       remotingConnection.removeFailureListener(this);
-      
+
       callback.closed();
    }
 
@@ -394,6 +394,8 @@
       }
 
       Queue queue = server.createQueue(address, name, filterString, durable, temporary);
+      
+      System.out.println("Created queue " + queue + " / address=" + address + " on " + server);
 
       if (temporary)
       {
@@ -402,7 +404,7 @@
          // dies. It does not mean it will get deleted automatically when the
          // session is closed.
          // It is up to the user to delete the queue when finished with it
-        
+
          TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name, queue);
 
          remotingConnection.addCloseListener(cleaner);
@@ -411,8 +413,7 @@
          tempQueueCleannerUppers.put(name, cleaner);
       }
    }
-   
-   
+
    /**
     * For test cases only
     * @return
@@ -427,7 +428,7 @@
       private final PostOffice postOffice;
 
       private final SimpleString bindingName;
-      
+
       private final Queue queue;
 
       TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName, final Queue queue)
@@ -435,7 +436,7 @@
          this.postOffice = postOffice;
 
          this.bindingName = bindingName;
-         
+
          this.queue = queue;
       }
 
@@ -443,15 +444,15 @@
       {
          try
          {
-        	if (log.isDebugEnabled())
-        	{
-        	   log.debug("deleting temporary queue " + bindingName);
-        	}
+            if (log.isDebugEnabled())
+            {
+               log.debug("deleting temporary queue " + bindingName);
+            }
             if (postOffice.getBinding(bindingName) != null)
             {
                postOffice.removeBinding(bindingName);
             }
-            
+
             queue.deleteAllReferences();
          }
          catch (Exception e)
@@ -469,7 +470,7 @@
       {
          run();
       }
-      
+
       public String toString()
       {
          return "Temporary Cleaner for queue " + bindingName;
@@ -489,11 +490,11 @@
       server.destroyQueue(name, this);
 
       TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
-      
+
       if (cleaner != null)
       {
          remotingConnection.removeCloseListener(cleaner);
-         
+
          remotingConnection.removeFailureListener(cleaner);
       }
    }
@@ -576,8 +577,8 @@
    public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
-      
-       consumer.acknowledge(autoCommitAcks, tx, messageID);
+
+      consumer.acknowledge(autoCommitAcks, tx, messageID);
    }
 
    public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
@@ -935,7 +936,7 @@
                throw new HornetQXAException(XAException.XAER_PROTO,
                                             "Cannot prepare transaction, it is suspended " + xid);
             }
-            else if(theTx.getState() == Transaction.State.PREPARED)
+            else if (theTx.getState() == Transaction.State.PREPARED)
             {
                log.info("ignoring prepare on xid as already called :" + xid);
             }
@@ -966,7 +967,7 @@
    public void xaSetTimeout(final int timeout)
    {
       timeoutSeconds = timeout;
-      if(tx != null)
+      if (tx != null)
       {
          tx.setTimeout(timeout);
       }
@@ -981,18 +982,18 @@
    {
       setStarted(false);
    }
-   
+
    public void waitContextCompletion()
    {
       OperationContext formerCtx = storageManager.getContext();
-      
+
       try
       {
          try
          {
             if (!storageManager.waitOnOperations(10000))
             {
-               log.warn("Couldn't finish context execution in 10 seconds", new Exception ("warning"));
+               log.warn("Couldn't finish context execution in 10 seconds", new Exception("warning"));
             }
          }
          catch (Exception e)
@@ -1009,7 +1010,7 @@
    public void close(final boolean failed)
    {
       OperationContext formerCtx = storageManager.getContext();
-      
+
       try
       {
          storageManager.setContext(sessionContext);
@@ -1019,7 +1020,7 @@
             public void onError(int errorCode, String errorMessage)
             {
             }
-   
+
             public void done()
             {
                try
@@ -1071,9 +1072,9 @@
    {
       // need to create the LargeMessage before continue
       long id = storageManager.generateUniqueID();
-      
+
       LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
- 
+
       if (currentLargeMessage != null)
       {
          ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
@@ -1085,7 +1086,7 @@
    public void send(final ServerMessage message, final boolean direct) throws Exception
    {
       long id = storageManager.generateUniqueID();
-      
+
       SimpleString address = message.getAddress();
 
       message.setMessageID(id);
@@ -1111,7 +1112,6 @@
          log.trace("send(message=" + message + ", direct=" + direct + ") being called");
       }
 
-
       if (message.getAddress().equals(managementAddress))
       {
          // It's a management message
@@ -1129,7 +1129,10 @@
       }
    }
 
-   public void sendContinuations(final int packetSize, final long messageBodySize, final byte[] body, final boolean continues) throws Exception
+   public void sendContinuations(final int packetSize,
+                                 final long messageBodySize,
+                                 final byte[] body,
+                                 final boolean continues) throws Exception
    {
       if (currentLargeMessage == null)
       {
@@ -1144,7 +1147,7 @@
       if (!continues)
       {
          currentLargeMessage.releaseResources();
-         
+
          if (messageBodySize >= 0)
          {
             currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
@@ -1178,7 +1181,6 @@
          consumer.setTransferring(transferring);
       }
    }
-   
 
    public void addMetaData(String key, String data)
    {
@@ -1198,7 +1200,7 @@
       }
       return data;
    }
-   
+
    public String[] getTargetAddresses()
    {
       Map<SimpleString, Pair<UUID, AtomicLong>> copy = cloneTargetAddresses();
@@ -1238,7 +1240,7 @@
    public void describeProducersInfo(JSONArray array) throws Exception
    {
       Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = cloneTargetAddresses();
-      
+
       for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet())
       {
          JSONObject producerInfo = new JSONObject();
@@ -1251,7 +1253,6 @@
       }
    }
 
-
    // FailureListener implementation
    // --------------------------------------------------------------------
 
@@ -1271,7 +1272,6 @@
       }
    }
 
-
    // Public
    // ----------------------------------------------------------------------------
 
@@ -1337,7 +1337,7 @@
 
          toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
       }
-      
+
       for (MessageReference ref : toCancel)
       {
          ref.getQueue().cancel(theTx, ref);
@@ -1379,12 +1379,12 @@
       }
 
       postOffice.route(msg, routingContext, direct);
-      
+
       Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-      
+
       if (value == null)
       {
-         targetAddressInfos.put(msg.getAddress(), new Pair<UUID,AtomicLong>(msg.getUserID(), new AtomicLong(1)));
+         targetAddressInfos.put(msg.getAddress(), new Pair<UUID, AtomicLong>(msg.getUserID(), new AtomicLong(1)));
       }
       else
       {

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -444,6 +444,7 @@
 
          // Now we will simulate a failure of the bridge connection between server0 and server1
          Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+         assertNotNull(bridge);
          RemotingConnection forwardingConnection = getForwardingConnection(bridge);
          InVMConnector.failOnCreateConnection = true;
          InVMConnector.numberOfFailures = reconnectAttempts - 1;

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -88,7 +88,7 @@
 
    private static final long WAIT_TIMEOUT = 10000;
    
-   private static final long TIMEOUT_START_SERVER = 500;
+   private static final long TIMEOUT_START_SERVER = 10;
 
    @Override
    protected void setUp() throws Exception
@@ -385,8 +385,8 @@
          {
             if (hornetQServer != null)
             {
-               out.println(clusterDescription(hornetQServer));
-               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+               System.out.println(clusterDescription(hornetQServer));
+               System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
                                                                      .getManagementNotificationAddress()
                                                                      .toString()));
             }
@@ -2023,16 +2023,20 @@
       for (int node : nodes)
       {
          log.info("#test start node " + node);
-//         if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-//         {
-//            Thread.sleep(TIMEOUT_START_SERVER);
-//         }
-         Thread.sleep(TIMEOUT_START_SERVER);
+         if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+         {
+            Thread.sleep(TIMEOUT_START_SERVER);
+         }
          timeStarts[node] = System.currentTimeMillis();
          
          servers[node].setIdentity("server " + node);
          ClusterTestBase.log.info("starting server " + servers[node]);
          servers[node].start();
+         
+//         for (int i = 0 ; i <= node; i++)
+//         {
+//            System.out.println(servers[node].getClusterManager().getTopology().describe());
+//         }
 
          ClusterTestBase.log.info("started server " + servers[node]);
 
@@ -2068,13 +2072,13 @@
          {
             try
             {
-//               if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
-//               {
-//                  // We can't stop and start a node too fast (faster than what the Topology could realize about this
-//                  Thread.sleep(TIMEOUT_START_SERVER);
-//               }
+               if (System.currentTimeMillis() - timeStarts[node] < TIMEOUT_START_SERVER)
+               {
+                  // We can't stop and start a node too fast (faster than what the Topology could realize about this
+                 Thread.sleep(TIMEOUT_START_SERVER);
+               }
                
-               Thread.sleep(TIMEOUT_START_SERVER);
+               //Thread.sleep(TIMEOUT_START_SERVER);
 
                timeStarts[node] = System.currentTimeMillis();
                

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -97,6 +97,8 @@
       setupCluster();
 
       startServers(0, 1, 2, 3, 4);
+      
+      Thread.sleep(1000);
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -148,28 +148,33 @@
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
       startServers(0, 1, 2);
+      
+      waitForTopology(servers[0], 3);
+      waitForTopology(servers[1], 3);
+      waitForTopology(servers[2], 3);
+      
+      for (int i = 0 ; i < 3; i++)
+      {
+         System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
+      }
 
-      for (int i = 0; i < 10; i++)
-         log.info("****************************");
       for (int i = 0; i <= 2; i++)
       {
          log.info("*************************************\n " + servers[i] +
                   " topology:\n" +
                   servers[i].getClusterManager().getTopology().describe());
       }
-      for (int i = 0; i < 10; i++)
-         log.info("****************************");
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
 
-      createQueue(0, "queues.testaddress", "queue0", null, false);
-      createQueue(1, "queues.testaddress", "queue0", null, false);
-      createQueue(2, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "manequeue0", null, false);
+      createQueue(1, "queues.testaddress", "manequeue0", null, false);
+      createQueue(2, "queues.testaddress", "manequeue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
+      addConsumer(0, 0, "manequeue0", null);
+      addConsumer(1, 1, "manequeue0", null);
+      addConsumer(2, 2, "manequeue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -180,19 +185,52 @@
       waitForBindings(2, "queues.testaddress", 2, 2, false);
 
    }
+   
+   public void testSimple_TwoNodes() throws Exception
+   {
+      setupServer(0, false, isNetty());
+      setupServer(1, false, isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      for (int i = 0; i <= 1; i++)
+      {
+         log.info("*************************************\n " + servers[i] +
+                  " topology:\n" +
+                  servers[i].getClusterManager().getTopology().describe());
+      }
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "manequeue0", null, false);
+      createQueue(1, "queues.testaddress", "manequeue0", null, false);
+
+      addConsumer(0, 0, "manequeue0", null);
+      addConsumer(1, 1, "manequeue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+      
+      closeAllConsumers();
+
+   }
+
    static int loopNumber;
    public void _testLoop() throws Throwable
    {
-      for (int i = 0 ; i < 1000; i++)
+      for (int i = 0 ; i < 10; i++)
       {
          loopNumber = i;
          log.info("#test " + i);
-         testSimple2();
-         if (i + 1  < 1000)
-         {
-            tearDown();
-            setUp();
-         }
+         testSimple();
+         tearDown();
+         setUp();
       }
    }
 

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.cluster.distribution;
 
+import java.util.Map;
+
 import org.hornetq.core.logging.Logger;
 
 /**
@@ -136,14 +138,20 @@
          Thread.currentThread().setName("ThreadOnTestRestartTest");
          startServers(0, 1);
          waitForTopology(servers[0], 2);
+         
+         System.out.println(servers[0].getClusterManager().getTopology().describe());
+         System.out.println(servers[1].getClusterManager().getTopology().describe());
          waitForTopology(servers[1], 2);
 
-         for (int i = 0; i < 5; i++)
+         
+         for (int i = 0; i < 10; i++)
          {
+            Thread.sleep(10);
             log.info("Sleep #test " + i);
             log.info("#stop #test #" + i);
-            Thread.sleep(500);
             stopServers(1);
+            
+            System.out.println(servers[0].getClusterManager().getTopology().describe());
             waitForTopology(servers[0], 1, 2000);
             log.info("#start #test #" + i);
             startServers(1);

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -433,8 +433,9 @@
          this.latch = latch;
       }
 
-      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+      public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
+         System.out.println("Received " + connectorPair + " uniqueEvent=" + uniqueEventID);
          if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {
             liveNode.add(connectorPair.a.getName());
@@ -447,7 +448,7 @@
          }
       }
 
-      public void nodeDown(String nodeID)
+      public void nodeDown(final long uniqueEventID, String nodeID)
       {
          //To change body of implemented methods use File | Settings | File Templates.
       }

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -219,7 +219,7 @@
          this.latch = latch;
       }
 
-      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+      public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
          if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {
@@ -233,7 +233,7 @@
          }
       }
 
-      public void nodeDown(String nodeID)
+      public void nodeDown(final long uniqueEventID, String nodeID)
       {
       }
    }

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -25,7 +25,6 @@
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -91,6 +90,7 @@
       ClientSession session = sendAndConsume(sf, true);
 
       System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+      Thread.sleep(500);
       servers.get(0).crash(session);
 
       int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -204,7 +204,8 @@
 
       locator.addClusterTopologyListener(new ClusterTopologyListener()
       {
-         public void nodeUP(String nodeID,
+         public void nodeUP(final long uniqueEventID, 
+                            String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             boolean last)
          {
@@ -222,7 +223,7 @@
             }
          }
 
-         public void nodeDown(String nodeID)
+         public void nodeDown(final long uniqueEventID, String nodeID)
          {
             if (nodes.contains(nodeID))
             {
@@ -278,7 +279,8 @@
 
       locator.addClusterTopologyListener(new ClusterTopologyListener()
       {
-         public void nodeUP(String nodeID,
+         public void nodeUP(final long uniqueEventID, 
+                            String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             boolean last)
          {
@@ -289,7 +291,7 @@
             }
          }
 
-         public void nodeDown(String nodeID)
+         public void nodeDown(final long uniqueEventID, String nodeID)
          {
             if (nodes.contains(nodeID))
             {
@@ -350,7 +352,7 @@
 
       locator.addClusterTopologyListener(new ClusterTopologyListener()
       {
-         public void nodeUP(String nodeID,
+         public void nodeUP(final long uniqueEventID, String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             boolean last)
          {
@@ -361,7 +363,7 @@
             }
          }
 
-         public void nodeDown(String nodeID)
+         public void nodeDown(final long uniqueEventID, String nodeID)
          {
             if (nodes.contains(nodeID))
             {
@@ -432,7 +434,7 @@
 
       locator.addClusterTopologyListener(new ClusterTopologyListener()
       {
-         public void nodeUP(String nodeID, 
+         public void nodeUP(final long uniqueEventID, String nodeID, 
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             boolean last)
          {
@@ -443,7 +445,7 @@
             }
          }
 
-         public void nodeDown(String nodeID)
+         public void nodeDown(final long uniqueEventID, String nodeID)
          {
             if (nodes.contains(nodeID))
             {

Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2011-08-29 16:16:58 UTC (rev 11233)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2011-08-29 16:20:36 UTC (rev 11234)
@@ -296,7 +296,7 @@
          this.latch = latch;
       }
 
-      public void nodeUP(String nodeID,
+      public void nodeUP(final long uniqueEventID, String nodeID,
                          Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                          boolean last)
       {
@@ -312,7 +312,7 @@
          }
       }
 
-      public void nodeDown(String nodeID)
+      public void nodeDown(final long uniqueEventID, String nodeID)
       {
          // To change body of implemented methods use File | Settings | File Templates.
       }



More information about the hornetq-commits mailing list