[hornetq-commits] JBoss hornetq SVN: r9565 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/cluster/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 19 07:42:36 EDT 2010


Author: jmesnil
Date: 2010-08-19 07:42:35 -0400 (Thu, 19 Aug 2010)
New Revision: 9565

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Removed:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
HA refactoring

* improvements for discovery, cluster formation using node notifications, etc.

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -43,6 +43,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.hornetq.core.remoting.FailureListener;
@@ -1072,6 +1073,11 @@
          if (serverLocator.isHA())
          {
             channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
+            if (serverLocator.isClusterConnection())
+            {
+               TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
+               channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+            }
          }
       }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -18,7 +18,6 @@
 import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -44,9 +43,6 @@
 import org.hornetq.core.cluster.DiscoveryListener;
 import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.cluster.impl.Topology;
-import org.hornetq.core.server.cluster.impl.TopologyMember;
-import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -166,6 +162,10 @@
    private String groupID;
 
    private String nodeID;
+   
+   private TransportConfiguration clusterTransportConfiguration;
+   
+   private boolean backup;
 
    private static synchronized ExecutorService getGlobalThreadPool()
    {
@@ -504,7 +504,7 @@
             interceptors);
 
       factories.add(factory);
-
+      
       return factory;
    }
 
@@ -1008,6 +1008,11 @@
    {
       this.nodeID = nodeID;
    }
+   
+   public String getNodeID()
+   {
+      return nodeID;
+   }
 
    public void setClusterConnection(boolean clusterConnection)
    {
@@ -1018,7 +1023,27 @@
    {
       return clusterConnection;
    }
+   
+   public TransportConfiguration getClusterTransportConfiguration()
+   {
+      return clusterTransportConfiguration;
+   }
+   
+   public void setClusterTransportConfiguration(TransportConfiguration tc)
+   {
+      this.clusterTransportConfiguration = tc;
+   }
 
+   public boolean isBackup()
+   {
+      return backup;
+   }
+   
+   public void setBackup(boolean backup)
+   {
+      this.backup = backup;
+   }
+
    @Override
    protected void finalize() throws Throwable
    {
@@ -1091,31 +1116,43 @@
       closed = true;
    }
 
-   public synchronized void notifyNodeDown(final String nodeID)
+   public void notifyNodeDown(final String nodeID)
    {
-      if (!ha)
+      boolean removed = false;
+      synchronized (this)
       {
-         return;
-      }
+         if (!ha)
+         {
+            return;
+         }
 
-      topology.removeMember(nodeID);
+         removed = topology.removeMember(nodeID);
+         
+         if (!topology.isEmpty())
+         {
+            updateArraysAndPairs();
+            
+            if (topology.size() == 1 && topology.getMember(nodeID) != null)
+            {
+               receivedTopology = false;
+            }
+         }
+         else
+         {
+            pairs.clear();
 
-      if (!topology.isEmpty())
-      {
-         updateArraysAndPairs();
-      }
-      else
-      {
-         pairs.clear();
+            topologyArray = null;
 
-         topologyArray = null;
-
-         receivedTopology = false;
+            receivedTopology = false;
+         }
       }
 
-      for (ClusterTopologyListener listener : topologyListeners)
+      if (removed)
       {
-         listener.nodeDown(nodeID);
+         for (ClusterTopologyListener listener : topologyListeners)
+         {
+            listener.nodeDown(nodeID);
+         }
       }
    }
 
@@ -1144,7 +1181,6 @@
       }
 
       // Notify if waiting on getting topology
-
       notify();
    }
 
@@ -1175,11 +1211,14 @@
       for (DiscoveryEntry entry : newConnectors)
       {
          this.initialConnectors[count++] = entry.getConnector();
-
-         notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true, 1);
       }
-
-      System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
+      
+      if (clusterConnection && !receivedTopology)
+      {
+         // FIXME the node is alone in the cluster. We create a connection to the new node
+         // to trigger the node notification to form the cluster.
+         connect();
+      }
    }
 
    public synchronized void factoryClosed(final ClientSessionFactory factory)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -35,7 +35,9 @@
    TransportConfiguration getBackup( TransportConfiguration live);
 
    void setNodeID(String nodeID);
-   
+
+   String getNodeID();
+
    void connect();
    
    void addClusterTopologyListener(ClusterTopologyListener listener);
@@ -49,4 +51,13 @@
    void setClusterConnection(boolean clusterConnection);
 
    boolean isClusterConnection();
+
+   TransportConfiguration getClusterTransportConfiguration();
+
+   void setClusterTransportConfiguration(TransportConfiguration tc);
+
+   boolean isBackup();
+   
+   void setBackup(boolean backup);
+
 }

Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.client.ClusterTopologyListener;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Aug 16, 2010
+ */
+public class Topology
+{
+   /*
+    * topology describes the other cluster nodes that this server knows about:
+    *
+    * keys are node IDs
+    * values are a pair of live/backup transport configurations
+    */
+   private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+
+   public synchronized boolean addMember(String nodeId, TopologyMember member)
+   {
+      boolean replaced = topology.containsKey(nodeId);
+      topology.put(nodeId, member);
+      return replaced;
+   }
+
+   public synchronized boolean removeMember(String nodeId)
+   {
+      TopologyMember member = topology.remove(nodeId);
+      return (member != null);
+   }
+
+   public synchronized void fireListeners(ClusterTopologyListener listener)
+   {
+      int count = 0;
+      for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
+      {
+         listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+      }
+   }
+
+   public TopologyMember getMember(String nodeID)
+   {
+      return topology.get(nodeID);
+   }
+
+   public boolean isEmpty()
+   {
+      return topology.isEmpty();
+   }
+
+   public Collection<TopologyMember> getMembers()
+   {
+      return topology.values();
+   }
+
+   public int size()
+   {
+      return topology.size();
+   }
+
+   public String describe()
+   {
+
+      String desc = "";
+      for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
+      {
+         desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
+      }
+      return desc;
+   }
+
+   public void clear()
+   {
+      topology.clear();
+   }
+}

Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Aug 16, 2010
+ */
+public class TopologyMember
+{
+   private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+   private final int distance;
+
+   public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+   {
+      this.connector = connector;
+      this.distance = distance;
+   }
+
+   public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+   {
+      return connector;
+   }
+
+   public int getDistance()
+   {
+      return distance;
+   }
+   
+   @Override
+   public String toString()
+   {
+      return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+   }
+}

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -413,7 +413,6 @@
 
          if (entry.getValue().getLastUpdate() + timeout <= now)
          {
-            System.out.println("remove " + entry);
             iter.remove();
 
             changed = true;

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -30,6 +30,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.hornetq.core.remoting.CloseListener;
@@ -118,11 +119,6 @@
                   {
                      channel0.send(new ClusterTopologyChangeMessage(nodeID));
                   }
-                  
-                  public String toString()
-                  {
-                     return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
-                  };
                };
                
                final boolean isCC = msg.isClusterConnection();
@@ -137,6 +133,21 @@
                   }
                });
             }
+            else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+            {
+               NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+
+               Pair<TransportConfiguration, TransportConfiguration> pair;
+               if (msg.isBackup())
+               {
+                  pair = new Pair<TransportConfiguration, TransportConfiguration>(null, msg.getConnector());
+               }
+               else
+               {
+                  pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+               }
+               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+            }
          }
       });
       

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -21,6 +21,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
             packet = new ClusterTopologyChangeMessage();
             break;
          }
+         case NODE_ANNOUNCE:
+         {
+            packet = new NodeAnnounceMessage();
+            break;
+         }
          case SUBSCRIBE_TOPOLOGY:
          {
             packet = new SubscribeClusterTopologyUpdatesMessage();

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -186,6 +186,8 @@
    
    public static final byte CLUSTER_TOPOLOGY = 110;
 
+   public static final byte NODE_ANNOUNCE = 111;
+
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
 
    // Static --------------------------------------------------------

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+   // Attributes ----------------------------------------------------
+
+   private String nodeID;
+   
+   private boolean backup;
+   
+   private TransportConfiguration connector;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+   {
+      super(PacketImpl.NODE_ANNOUNCE);
+
+      this.nodeID = nodeID;
+      
+      this.backup = backup;
+      
+      this.connector = tc;
+   }
+
+   public NodeAnnounceMessage()
+   {
+      super(PacketImpl.NODE_ANNOUNCE);
+   }
+
+   // Public --------------------------------------------------------
+
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+   
+   public boolean isBackup()
+   {
+      return backup;
+   }
+   
+   public TransportConfiguration getConnector()
+   {
+      return connector;
+   }
+   
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(nodeID);
+      buffer.writeBoolean(backup);
+      connector.encode(buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      this.nodeID = buffer.readString();
+      this.backup = buffer.readBoolean();
+      connector = new TransportConfiguration();
+      connector.decode(buffer);
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -15,7 +15,6 @@
 
 import java.util.Map;
 
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -51,12 +50,8 @@
 
    void activate();
    
-   Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
-   
    TransportConfiguration getConnector();
 
    // for debug
    String description();
-
-   void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -20,6 +20,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -46,7 +47,9 @@
    
    void activate();
 
-   void notifyClientsNodeDown(String nodeID);
+   void notifyNodeDown(String nodeID);
 
-   void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b, int distance);
+   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+
+   Topology getTopology();
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,7 +25,6 @@
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.ResourceNames;
@@ -238,8 +237,7 @@
       }
       catch (Exception e)
       {
-         // TODO Auto-generated catch block
-         e.printStackTrace();
+    	  log.warn("Unable to clean up the session after a connection failure", e);
       }
       serverLocator.notifyNodeDown(targetNodeID);
       super.connectionFailed(me);

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -93,12 +93,12 @@
 
    private final String clusterPassword;
 
-   private Pair<TransportConfiguration, TransportConfiguration>[] topology;
-
    private final ServerLocatorInternal serverLocator;
    
    private final TransportConfiguration connector;
 
+   private final boolean allowsDirectConnectionsOnly;
+   
    public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -131,6 +131,15 @@
       if (this.serverLocator != null)
       {
          this.serverLocator.setClusterConnection(true);
+         this.serverLocator.setClusterTransportConfiguration(connector);
+         this.serverLocator.setBackup(server.getConfiguration().isBackup());
+         
+         // a cluster connection will connect to other nodes only if they are directly connected
+         // through a static list of connectors 
+         allowsDirectConnectionsOnly = (serverLocator.getStaticTransportConfigurations() != null);
+      } else
+      {
+         allowsDirectConnectionsOnly = false;
       }
 
       this.connector = connector;
@@ -245,11 +254,6 @@
       started = false;
    }
 
-   public Pair<TransportConfiguration, TransportConfiguration>[] getTopology()
-   {
-      return topology;
-   }
-
    public boolean isStarted()
    {
       return started;
@@ -302,8 +306,6 @@
          return;
       }
       
-      server.getClusterManager().notifyClientsNodeDown(nodeID);
-      
       //Remove the flow record for that node
       
       MessageFlowRecord record = records.remove(nodeID);
@@ -313,13 +315,15 @@
          try
          {
             record.reset();
-            record.close();
+            //record.close();
          }
          catch (Exception e)
          {
             log.error("Failed to close flow record", e);
          }
       }
+      
+      server.getClusterManager().notifyNodeDown(nodeID);
    }
 
    public synchronized void nodeUP(final String nodeID,
@@ -327,14 +331,28 @@
                                    final boolean last,
                                    final int distance)
    {
-      //we only create a bridge it it isnt ourselves and the node is 1hop away
-      if (nodeID.equals(nodeUUID.toString()) || distance > 1)
+      // discard notifications about ourselves
+      if (nodeID.equals(nodeUUID.toString()))
       {
          return;
       }
-      
-      server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false, distance);
-      
+
+      // we propagate the node notifications to all cluster topology listeners
+      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+
+      // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+      if (allowsDirectConnectionsOnly && distance > 1)
+      {
+         return;
+      }
+
+      // FIXME required to prevent cluster connections w/o discovery group 
+      // and empty static connectors to create bridges... ulgy!
+      if (serverLocator == null)
+      {
+         return;
+      }
+
       try
       {
          MessageFlowRecord record = records.get(nodeID);
@@ -377,21 +395,6 @@
       }
    }
    
-   public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
-   {
-      TransportConfiguration connector = (backup) ? pair.b : pair.a;
-      if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
-      {
-         for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
-         {
-            if (connector.equals(staticConnector))
-            {
-               nodeUP(nodeID, pair, false, 0);
-            }
-         }
-      }
-   }
-
    private void createNewRecord(final String nodeID,
                                 final TransportConfiguration connector,
                                 final SimpleString queueName,

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,12 +25,16 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 
+import apple.awt.CList;
+
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -194,7 +198,10 @@
             managementService.unregisterCluster(clusterConnection.getName().toString());
          }
          
+         clusterConnectionListeners.clear();
+         clientListeners.clear();
          clusterConnections.clear();
+         topology.clear();
 
       }
 
@@ -209,27 +216,56 @@
       started = false;
    }
 
-   public void notifyClientsNodeDown(String nodeID)
+   public void notifyNodeDown(String nodeID)
    {
-      topology.removeMember(nodeID);
+      if (nodeID.equals(nodeUUID.toString()))
+      {
+         return;
+      }
 
-      for (ClusterTopologyListener listener : clientListeners)
+      boolean removed = topology.removeMember(nodeID);
+      
+      if (removed)
       {
-         listener.nodeDown(nodeID);
+
+         for (ClusterTopologyListener listener : clientListeners)
+         {
+            listener.nodeDown(nodeID);
+         }
+
+         for (ClusterTopologyListener listener : clusterConnectionListeners)
+         {
+            listener.nodeDown(nodeID);
+         }
       }
    }
 
-   public void notifyClientsNodeUp(String nodeID,
+   public void notifyNodeUp(String nodeID,
                                    Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    boolean last,
                                    int distance)
    {
-      topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+      if (nodeID.equals(nodeUUID.toString()))
+      {
+         return;
+      }
+      
+      boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
 
+      if (distance >= topology.size() || updated)
+      {
+         return;
+      }
+      
       for (ClusterTopologyListener listener : clientListeners)
       {
          listener.nodeUP(nodeID, connectorPair, last, distance);
       }
+
+      for (ClusterTopologyListener listener : clusterConnectionListeners)
+      {
+         listener.nodeUP(nodeID, connectorPair, last, distance);
+      }
    }
    
    public boolean isStarted()
@@ -260,7 +296,6 @@
    public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
                                                      final boolean clusterConnection)
    {
-      System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
       if (clusterConnection)
       {
          this.clusterConnectionListeners.add(listener);
@@ -287,6 +322,11 @@
       }
    }
 
+   public Topology getTopology()
+   {
+      return topology;
+   }
+   
    // backup node becomes live
    public synchronized void activate()
    {
@@ -609,7 +649,7 @@
          serverLocator = null;
       }
 
-      ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
+      ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
                                                                       connector,
                                                                       new SimpleString(config.getName()),
                                                                       new SimpleString(config.getAddress()),

Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-
-import java.lang.reflect.Array;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- *         Created Aug 16, 2010
- */
-public class Topology
-{
-   /*
-    * topology describes the other cluster nodes that this server knows about:
-    *
-    * keys are node IDs
-    * values are a pair of live/backup transport configurations
-    */
-   private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
-
-   public synchronized void addMember(String nodeId, TopologyMember member)
-   {
-      topology.put(nodeId, member);
-   }
-
-   public synchronized void removeMember(String nodeId)
-   {
-      topology.remove(nodeId);
-   }
-
-   public synchronized void fireListeners(ClusterTopologyListener listener)
-   {
-      int count = 0;
-      for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
-      {
-         listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
-      }
-   }
-
-   public TopologyMember getMember(String nodeID)
-   {
-      return topology.get(nodeID);
-   }
-
-   public boolean isEmpty()
-   {
-      return topology.isEmpty();
-   }
-
-   public Collection<TopologyMember> getMembers()
-   {
-      return topology.values();
-   }
-
-   public int size()
-   {
-      return topology.size();
-   }
-}

Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- *         Created Aug 16, 2010
- */
-public class TopologyMember
-{
-   private final Pair<TransportConfiguration, TransportConfiguration> connector;
-
-      private final int distance;
-      public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
-      {
-         this.connector = connector;
-         this.distance = distance;
-      }
-
-      public Pair<TransportConfiguration, TransportConfiguration> getConnector()
-      {
-         return connector;
-      }
-
-      public int getDistance()
-      {
-         return distance;
-      }
-}

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -760,6 +760,8 @@
             out += cc.description() + "\n";
          }
       }
+      out += "\n\nfull topology:";
+      out += clusterManager.getTopology().describe();
       return out + br;
    }
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -189,6 +189,12 @@
       waitForBindings(3, "queues.testaddress", 1, 1, true);
       waitForBindings(4, "queues.testaddress", 1, 1, true);
 
+      System.out.println(clusterDescription(servers[0]));
+      System.out.println(clusterDescription(servers[1]));
+      System.out.println(clusterDescription(servers[2]));
+      System.out.println(clusterDescription(servers[3]));
+      System.out.println(clusterDescription(servers[4]));
+
       waitForBindings(0, "queues.testaddress", 4, 4, false);
       waitForBindings(1, "queues.testaddress", 4, 4, false);
       waitForBindings(2, "queues.testaddress", 4, 4, false);
@@ -1326,6 +1332,11 @@
 
    public void testStartStopServers() throws Exception
    {
+      doTestStartStopServers(1, 3000);
+   }
+   
+   public void doTestStartStopServers(long pauseBeforeServerRestarts, long pauseAfterServerRestarts) throws Exception
+   {
       setupCluster();
 
       startServers();
@@ -1412,6 +1423,15 @@
       waitForBindings(3, "queues.testaddress", 6, 6, true);
       waitForBindings(4, "queues.testaddress", 7, 7, true);
 
+      Thread.sleep(2000);
+      System.out.println("#####################################");
+      System.out.println(clusterDescription(servers[0]));
+      System.out.println(clusterDescription(servers[1]));
+      System.out.println(clusterDescription(servers[2]));
+      System.out.println(clusterDescription(servers[3]));
+      System.out.println(clusterDescription(servers[4]));
+      System.out.println("#####################################");
+      
       waitForBindings(0, "queues.testaddress", 23, 23, false);
       waitForBindings(1, "queues.testaddress", 23, 23, false);
       waitForBindings(2, "queues.testaddress", 23, 23, false);
@@ -1455,9 +1475,11 @@
       System.out.println(clusterDescription(servers[4]));
       System.out.println("#####################################");
       
+      Thread.sleep(pauseBeforeServerRestarts);
+      
       startServers(3, 0);
 
-      Thread.sleep(3000);
+      Thread.sleep(pauseAfterServerRestarts);
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(3, isNetty());

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -98,193 +98,7 @@
     */
    public void testStartStopServersWithPauseBeforeRestarting() throws Exception
    {
-      setupCluster();
-
-      startServers();
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-      setupSessionFactory(3, isNetty());
-      setupSessionFactory(4, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, false);
-      createQueue(1, "queues.testaddress", "queue1", null, false);
-      createQueue(2, "queues.testaddress", "queue2", null, false);
-      createQueue(3, "queues.testaddress", "queue3", null, false);
-      createQueue(4, "queues.testaddress", "queue4", null, false);
-
-      createQueue(0, "queues.testaddress", "queue5", null, false);
-      createQueue(1, "queues.testaddress", "queue6", null, false);
-      createQueue(2, "queues.testaddress", "queue7", null, false);
-      createQueue(3, "queues.testaddress", "queue8", null, false);
-      createQueue(4, "queues.testaddress", "queue9", null, false);
-
-      createQueue(0, "queues.testaddress", "queue10", null, false);
-      createQueue(1, "queues.testaddress", "queue11", null, false);
-      createQueue(2, "queues.testaddress", "queue12", null, false);
-      createQueue(3, "queues.testaddress", "queue13", null, false);
-      createQueue(4, "queues.testaddress", "queue14", null, false);
-
-      createQueue(0, "queues.testaddress", "queue15", null, false);
-      createQueue(1, "queues.testaddress", "queue15", null, false);
-      createQueue(2, "queues.testaddress", "queue15", null, false);
-      createQueue(3, "queues.testaddress", "queue15", null, false);
-      createQueue(4, "queues.testaddress", "queue15", null, false);
-
-      createQueue(2, "queues.testaddress", "queue16", null, false);
-      createQueue(3, "queues.testaddress", "queue16", null, false);
-      createQueue(4, "queues.testaddress", "queue16", null, false);
-
-      createQueue(0, "queues.testaddress", "queue17", null, false);
-      createQueue(1, "queues.testaddress", "queue17", null, false);
-      createQueue(4, "queues.testaddress", "queue17", null, false);
-
-      createQueue(3, "queues.testaddress", "queue18", null, false);
-      createQueue(4, "queues.testaddress", "queue18", null, false);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue1", null);
-      addConsumer(2, 2, "queue2", null);
-      addConsumer(3, 3, "queue3", null);
-      addConsumer(4, 4, "queue4", null);
-
-      addConsumer(5, 0, "queue5", null);
-      addConsumer(6, 1, "queue6", null);
-      addConsumer(7, 2, "queue7", null);
-      addConsumer(8, 3, "queue8", null);
-      addConsumer(9, 4, "queue9", null);
-
-      addConsumer(10, 0, "queue10", null);
-      addConsumer(11, 1, "queue11", null);
-      addConsumer(12, 2, "queue12", null);
-      addConsumer(13, 3, "queue13", null);
-      addConsumer(14, 4, "queue14", null);
-
-      addConsumer(15, 0, "queue15", null);
-      addConsumer(16, 1, "queue15", null);
-      addConsumer(17, 2, "queue15", null);
-      addConsumer(18, 3, "queue15", null);
-      addConsumer(19, 4, "queue15", null);
-
-      addConsumer(20, 2, "queue16", null);
-      addConsumer(21, 3, "queue16", null);
-      addConsumer(22, 4, "queue16", null);
-
-      addConsumer(23, 0, "queue17", null);
-      addConsumer(24, 1, "queue17", null);
-      addConsumer(25, 4, "queue17", null);
-
-      addConsumer(26, 3, "queue18", null);
-      addConsumer(27, 4, "queue18", null);
-
-      waitForBindings(0, "queues.testaddress", 5, 5, true);
-      waitForBindings(1, "queues.testaddress", 5, 5, true);
-      waitForBindings(2, "queues.testaddress", 5, 5, true);
-      waitForBindings(3, "queues.testaddress", 6, 6, true);
-      waitForBindings(4, "queues.testaddress", 7, 7, true);
-
-      waitForBindings(0, "queues.testaddress", 23, 23, false);
-      waitForBindings(1, "queues.testaddress", 23, 23, false);
-      waitForBindings(2, "queues.testaddress", 23, 23, false);
-      waitForBindings(3, "queues.testaddress", 22, 22, false);
-      waitForBindings(4, "queues.testaddress", 21, 21, false);
-
-      send(0, "queues.testaddress", 10, false, null);
-
-      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
-
-      removeConsumer(0);
-      removeConsumer(5);
-      removeConsumer(10);
-      removeConsumer(15);
-      removeConsumer(23);
-      removeConsumer(3);
-      removeConsumer(8);
-      removeConsumer(13);
-      removeConsumer(18);
-      removeConsumer(21);
-      removeConsumer(26);
-
-      closeSessionFactory(0);
-      closeSessionFactory(3);
-
-      stopServers(0, 3);
-
-      Thread.sleep(10000);
-
-      startServers(3, 0);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(3, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, false);
-      createQueue(3, "queues.testaddress", "queue3", null, false);
-
-      createQueue(0, "queues.testaddress", "queue5", null, false);
-      createQueue(3, "queues.testaddress", "queue8", null, false);
-
-      createQueue(0, "queues.testaddress", "queue10", null, false);
-      createQueue(3, "queues.testaddress", "queue13", null, false);
-
-      createQueue(0, "queues.testaddress", "queue15", null, false);
-      createQueue(3, "queues.testaddress", "queue15", null, false);
-
-      createQueue(3, "queues.testaddress", "queue16", null, false);
-
-      createQueue(0, "queues.testaddress", "queue17", null, false);
-
-      createQueue(3, "queues.testaddress", "queue18", null, false);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(3, 3, "queue3", null);
-
-      addConsumer(5, 0, "queue5", null);
-      addConsumer(8, 3, "queue8", null);
-
-      addConsumer(10, 0, "queue10", null);
-      addConsumer(13, 3, "queue13", null);
-
-      addConsumer(15, 0, "queue15", null);
-      addConsumer(18, 3, "queue15", null);
-
-      addConsumer(21, 3, "queue16", null);
-
-      addConsumer(23, 0, "queue17", null);
-
-      addConsumer(26, 3, "queue18", null);
-
-      waitForBindings(0, "queues.testaddress", 5, 5, true);
-      waitForBindings(1, "queues.testaddress", 5, 5, true);
-      waitForBindings(2, "queues.testaddress", 5, 5, true);
-      waitForBindings(3, "queues.testaddress", 6, 6, true);
-      waitForBindings(4, "queues.testaddress", 7, 7, true);
-
-      waitForBindings(0, "queues.testaddress", 23, 23, false);
-      waitForBindings(1, "queues.testaddress", 23, 23, false);
-      waitForBindings(2, "queues.testaddress", 23, 23, false);
-      waitForBindings(3, "queues.testaddress", 22, 22, false);
-      waitForBindings(4, "queues.testaddress", 21, 21, false);
-
-      send(0, "queues.testaddress", 10, false, null);
-
-      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
-      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+      doTestStartStopServers(10000, 3000);
    }
 
 }

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2010-08-19 11:42:35 UTC (rev 9565)
@@ -92,7 +92,39 @@
 
       stopServers(0, 1);
    }
+   
+   public void testStartPauseStartOther() throws Exception
+   {
 
+      startServers(0);
+
+      setupSessionFactory(0, isNetty());
+      createQueue(0, "queues", "queue0", null, false);
+      addConsumer(0, 0, "queue0", null);
+      
+      // we let the discovery initial timeout expire, 
+      // #0 will be alone in the cluster
+      Thread.sleep(12000);
+      
+      startServers(1);
+      setupSessionFactory(1, isNetty());
+      createQueue(1, "queues", "queue0", null, false);
+
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues", 1, 1, true);
+      waitForBindings(1, "queues", 1, 1, true);
+
+      waitForBindings(0, "queues", 1, 1, false);
+      waitForBindings(1, "queues", 1, 1, false);
+
+      send(0, "queues", 10, false, null);
+      verifyReceiveRoundRobin(10, 0, 1);
+      verifyNotReceive(0, 1);
+
+      stopServers(0, 1);
+   }
+
    public void testStopStart() throws Exception
    {
       startServers(0, 1);
@@ -127,7 +159,12 @@
       System.out.println(clusterDescription(servers[0]));
 
       startServers(1);
+      
+      Thread.sleep(3000);
 
+      System.out.println(clusterDescription(servers[0]));
+      System.out.println(clusterDescription(servers[1]));
+
       setupSessionFactory(1, isNetty());
 
       createQueue(1, "queues", "queue0", null, false);
@@ -137,9 +174,6 @@
       waitForBindings(0, "queues", 1, 1, true);
       waitForBindings(1, "queues", 1, 1, true);
 
-      System.out.println(clusterDescription(servers[0]));
-      System.out.println(clusterDescription(servers[1]));
-      
       waitForBindings(1, "queues", 1, 1, false);
       waitForBindings(0, "queues", 1, 1, false);
 



More information about the hornetq-commits mailing list