[hornetq-commits] JBoss hornetq SVN: r11621 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 31 15:20:05 EDT 2011


Author: clebert.suconic
Date: 2011-10-31 15:20:04 -0400 (Mon, 31 Oct 2011)
New Revision: 11621

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
Fixing tests: TopologyClusterTestBase was wrong (needed to wait topology before start ServerLocator) and a few synchronization blocks were missing on Discovery initialization

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -57,11 +57,15 @@
  */
 public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
 {
-    /*needed for backward compatibility*/
+   /*needed for backward compatibility*/
    private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+
    /*end of compatibility fixes*/
-   private enum STATE{ INITIALIZED, CLOSED, CLOSING};
-   
+   private enum STATE
+   {
+      INITIALIZED, CLOSED, CLOSING
+   };
+
    private static final long serialVersionUID = -1615857864410205260L;
 
    private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -75,6 +79,7 @@
    private transient String identity;
 
    private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+
    private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
 
    private TransportConfiguration[] initialConnectors;
@@ -530,7 +535,7 @@
 
          return pair.getA();
       }
-      
+
       // Get from initialconnectors
 
       int pos = loadBalancingPolicy.select(initialConnectors.length);
@@ -601,20 +606,19 @@
    {
       return afterConnectListener;
    }
-   
+
    public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
    {
       log.info(topology.describe("full topology"));
       TopologyMember topologyMember = topology.getMember(nodeID);
-      
+
       log.info("Creating connection factory towards " + nodeID + " = " + topologyMember);
-      
+
       if (topologyMember == null)
       {
          return null;
       }
-      else
-      if (topologyMember.getA() != null)
+      else if (topologyMember.getA() != null)
       {
          ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
          if (topologyMember.getB() != null)
@@ -641,7 +645,7 @@
       assertOpen();
 
       initialise();
-   
+
       ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
                                                                           transportConfiguration,
                                                                           callTimeout,
@@ -1239,31 +1243,34 @@
 
       state = STATE.CLOSING;
 
-      if (discoveryGroup != null)
+      synchronized (this)
       {
-         try
+         if (discoveryGroup != null)
          {
-            discoveryGroup.stop();
+            try
+            {
+               discoveryGroup.stop();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to stop discovery group", e);
+            }
          }
-         catch (Exception e)
+         else
          {
-            log.error("Failed to stop discovery group", e);
+            staticConnector.disconnect();
          }
       }
-      else
-      {
-         staticConnector.disconnect();
-      }
-      
+
       synchronized (connectingFactories)
       {
          for (ClientSessionFactoryInternal csf : connectingFactories)
          {
-           csf.close();
+            csf.close();
          }
          connectingFactories.clear();
       }
-      
+
       synchronized (factories)
       {
          Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
@@ -1320,7 +1327,7 @@
       readOnly = false;
 
       state = STATE.CLOSED;
- 
+
    }
 
    /** This is directly called when the connection to the node is gone,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -59,4 +59,6 @@
     * Basically this is for cluster bridges being disconnected
     */
    void disconnect();
+
+   boolean isConnected();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -65,4 +65,6 @@
    void informTopology();
    
    void announceBackup();
+
+   boolean isNodeActive(String id);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -131,6 +131,8 @@
 
    private NotificationService notificationService;
 
+   private boolean stopping = false;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -198,7 +200,7 @@
    {
       this.notificationService = notificationService;
    }
-   
+
    public synchronized void start() throws Exception
    {
       if (started)
@@ -208,6 +210,8 @@
 
       started = true;
 
+      stopping = false;
+
       if (activated)
       {
          activate();
@@ -221,7 +225,7 @@
          notificationService.sendNotification(notification);
       }
    }
-   
+
    public String debug()
    {
       return toString();
@@ -304,20 +308,32 @@
       });
    }
 
-   /** The cluster manager needs to use the same executor to close the serverLocator, otherwise the stop will break. 
-    *  This method is intended to expose this executor to the ClusterManager */
+   public boolean isConnected()
+   {
+      return session != null;
+   }
+
+   /** The cluster manager needs to use the same executor to close the serverLocator, otherwise the stop will break.
+   *  This method is intended to expose this executor to the ClusterManager */
    public Executor getExecutor()
    {
       return executor;
    }
-   
+
    public void stop() throws Exception
    {
+      if (stopping)
+      {
+         return;
+      }
+      
+      stopping = true;
+      
       if (log.isDebugEnabled())
       {
          log.debug("Bridge " + this.name + " being stopped");
       }
-      
+
       if (futureScheduledReconnection != null)
       {
          futureScheduledReconnection.cancel(true);
@@ -470,7 +486,10 @@
          {
             if (log.isDebugEnabled())
             {
-               log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+               log.debug("The transformer " + transformer +
+                         " made a copy of the message " +
+                         message +
+                         " as transformedMessage");
             }
          }
          return transformedMessage;
@@ -543,12 +562,12 @@
          // that this will throw a disconnect, we need to remove the message
          // from the acks so it will get resent, duplicate detection will cope
          // with any messages resent
-         
+
          if (log.isTraceEnabled())
          {
             log.trace("going to send message " + message);
          }
-         
+
          try
          {
             producer.send(dest, message);
@@ -579,7 +598,7 @@
          {
             producer.close();
          }
-         
+
          csf.cleanup();
       }
       catch (Throwable dontCare)
@@ -680,7 +699,6 @@
       return csf;
    }
 
-
    /* Hook for creating session factory */
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
@@ -806,6 +824,12 @@
          return;
       }
 
+      if (stopping)
+      {
+         log.info("Bridge is stopping, will not retry");
+         return;
+      }
+
       if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
       {
          log.warn("Bridge " + this.name +
@@ -968,7 +992,10 @@
    {
       public synchronized void run()
       {
-         connect();
+         if (!stopping)
+         {
+            connect();
+         }
       }
    }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -71,7 +71,7 @@
    private final SimpleString idsHeaderName;
 
    private final String targetNodeID;
-   
+
    private final long targetNodeEventUID;
 
    private final ServerLocatorInternal discoveryLocator;
@@ -150,10 +150,11 @@
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
-      
+
       if (factory == null)
       {
-         log.warn("NodeID=" + targetNodeID + " is not available on the topology. Retrying the connection to that node now");
+         log.warn("NodeID=" + targetNodeID +
+                  " is not available on the topology. Retrying the connection to that node now");
          return null;
       }
       factory.setReconnectAttempts(0);
@@ -181,7 +182,7 @@
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
       ServerMessage messageCopy = message.copy();
-      
+
       if (log.isTraceEnabled())
       {
          log.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
@@ -192,7 +193,7 @@
       Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getBytesProperty(idsHeaderName);
-      
+
       if (queueIds == null)
       {
          // Sanity check only
@@ -215,7 +216,7 @@
       messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
 
       messageCopy = super.beforeForward(messageCopy);
- 
+
       return messageCopy;
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -142,15 +142,14 @@
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
 
    private final ClusterManagerInternal manager;
-   
-   
+
    // Stuff that used to be on the ClusterManager
 
-   
    private final Topology topology = new Topology(this);
 
    private volatile ServerLocatorInternal backupServerLocator;
 
+   private boolean stopping = false;
 
    public ClusterConnectionImpl(final ClusterManagerInternal manager,
                                 final TransportConfiguration[] tcConfigs,
@@ -214,7 +213,7 @@
       this.executorFactory = executorFactory;
 
       this.executor = executorFactory.getExecutor();
-      
+
       this.topology.setExecutor(executor);
 
       this.server = server;
@@ -325,7 +324,7 @@
       this.executorFactory = executorFactory;
 
       this.executor = executorFactory.getExecutor();
-      
+
       this.topology.setExecutor(executor);
 
       this.server = server;
@@ -359,7 +358,7 @@
       this.manager = manager;
    }
 
-       public void start() throws Exception
+   public void start() throws Exception
    {
       synchronized (this)
       {
@@ -367,10 +366,10 @@
          {
             return;
          }
-         
-   
+
+         stopping = false;
          started = true;
-   
+
          if (!backup)
          {
             activate();
@@ -378,7 +377,7 @@
       }
 
    }
-   
+
    public void flushExecutor()
    {
       Future future = new Future();
@@ -395,7 +394,7 @@
       {
          return;
       }
-
+      stopping = true;
       if (log.isDebugEnabled())
       {
          log.debug(this + "::stopping ClusterConnection");
@@ -435,9 +434,7 @@
                                                       props);
          managementService.sendNotification(notification);
       }
-      
 
-
       executor.execute(new Runnable()
       {
          public void run()
@@ -463,36 +460,35 @@
       started = false;
    }
 
-   
    public void announceBackup()
    {
       executor.execute(new Runnable()
       {
          public void run()
          {
-             try
-             {
-                if (log.isDebugEnabled())
-                {
-                   log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
-                }
-                ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
-                if (backupSessionFactory != null)
-                {
-                   backupSessionFactory.getConnection()
-                                       .getChannel(0, -1)
-                                       .send(new NodeAnnounceMessage(System.currentTimeMillis(),
-                                                                     nodeUUID.toString(),
-                                                                     true,
-                                                                     connector,
-                                                                     null));
-                   log.info("backup announced");
-                }
-             }
-             catch (Exception e)
-             {
-                log.warn("Unable to announce backup, retrying", e);
-             }
+            try
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
+               }
+               ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
+               if (backupSessionFactory != null)
+               {
+                  backupSessionFactory.getConnection()
+                                      .getChannel(0, -1)
+                                      .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+                                                                    nodeUUID.toString(),
+                                                                    true,
+                                                                    connector,
+                                                                    null));
+                  log.info("backup announced");
+               }
+            }
+            catch (Exception e)
+            {
+               log.warn("Unable to announce backup, retrying", e);
+            }
          }
       });
    }
@@ -501,7 +497,7 @@
    {
       return topology.getMember(manager.getNodeId());
    }
-   
+
    public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
       topology.addClusterTopologyListener(listener);
@@ -519,7 +515,7 @@
    {
       return topology;
    }
-   
+
    public void nodeAnnounced(final long uniqueEventID,
                              final String nodeID,
                              final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
@@ -581,6 +577,16 @@
       return server;
    }
 
+   public boolean isNodeActive(String nodeId)
+   {
+      MessageFlowRecord rec = records.get(nodeId);
+      if (rec == null)
+      {
+         return false;
+      }
+      return rec.getBridge().isConnected();
+   }
+
    public Map<String, String> getNodes()
    {
       synchronized (records)
@@ -610,7 +616,7 @@
       }
 
       backup = false;
-      
+
       topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
 
       if (backupServerLocator != null)
@@ -627,8 +633,6 @@
          backupServerLocator = null;
       }
 
-
-
       serverLocator = clusterConnector.createServerLocator(true);
 
       if (serverLocator != null)
@@ -696,6 +700,10 @@
 
    public void nodeDown(final long eventUID, final String nodeID)
    {
+      if (stopping)
+      {
+         return;
+      }
       if (log.isDebugEnabled())
       {
          log.debug(this + " receiving nodeDown for nodeID=" + nodeID, new Exception("trace"));
@@ -731,6 +739,10 @@
                       final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                       final boolean last)
    {
+      if (stopping)
+      {
+         return;
+      }
       if (log.isDebugEnabled())
       {
          String ClusterTestBase = "receiving nodeUP for nodeID=";
@@ -831,13 +843,13 @@
          }
       }
    }
-   
+
    public synchronized void informTopology()
    {
       String nodeID = server.getNodeID().toString();
-      
+
       TopologyMember localMember;
-      
+
       if (backup)
       {
          localMember = new TopologyMember(null, connector);
@@ -850,7 +862,6 @@
       topology.updateAsLive(nodeID, localMember);
    }
 
-
    private void createNewRecord(final long eventUID,
                                 final String targetNodeID,
                                 final TransportConfiguration connector,
@@ -859,21 +870,21 @@
                                 final boolean start) throws Exception
    {
       final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, true, connector);
-      
+
       String nodeId;
-      
+
       synchronized (this)
       {
          if (!started)
          {
             return;
          }
-         
+
          if (serverLocator == null)
          {
             return;
          }
-         
+
          nodeId = serverLocator.getNodeID();
       }
 
@@ -1514,8 +1525,9 @@
    @Override
    public String toString()
    {
-      return "ClusterConnectionImpl@" + System.identityHashCode(this)  + 
-             "[nodeUUID=" + nodeUUID +
+      return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+             "[nodeUUID=" +
+             nodeUUID +
              ", connector=" +
              connector +
              ", address=" +
@@ -1565,7 +1577,7 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+            ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
             locator.setClusterConnection(true);
             return locator;
          }
@@ -1597,7 +1609,7 @@
 
       public ServerLocatorInternal createServerLocator(boolean includeTopology)
       {
-         ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
+         ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
          return locator;
 
       }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -33,7 +33,13 @@
    {
       return false;
    }
+   
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
 
+
    protected void setupCluster() throws Exception
    {
       setupCluster(false);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -28,7 +28,6 @@
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
 
 /**
  * A IsolatedTopologyTest

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-10-31 19:20:04 UTC (rev 11621)
@@ -17,6 +17,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import org.hornetq.api.core.HornetQException;
@@ -70,6 +71,11 @@
 
    abstract protected boolean isNetty() throws Exception;
 
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -114,11 +120,12 @@
          {
             ok = (ok && actual.contains(nodeIDs[expected[i]]));
          }
-         if (ok) 
+         if (ok)
          {
             return;
          }
-      } while(System.currentTimeMillis() - start < 5000);
+      }
+      while (System.currentTimeMillis() - start < 5000);
       fail("did not contain all expected node ID: " + actual);
    }
 
@@ -145,8 +152,8 @@
       {
          if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() == HornetQException.UNBLOCKED)
          {
-         ClientSessionFactory sf = locator.createSessionFactory();
-         return sf.createSession();
+            ClientSessionFactory sf = locator.createSessionFactory();
+            return sf.createSession();
          }
          else
          {
@@ -174,7 +181,14 @@
 
          for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
          {
-            nodesCount += clusterConn.getNodes().size();
+            Map<String, String> nodes = clusterConn.getNodes();
+            for (String id : nodes.keySet())
+            {
+               if (clusterConn.isNodeActive(id))
+               {
+                  nodesCount++;
+               }
+            }
          }
 
          if (nodesCount == count)
@@ -185,85 +199,92 @@
          Thread.sleep(10);
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- 
+
       log.error(clusterDescription(servers[node]));
       throw new IllegalStateException("Timed out waiting for cluster connections ");
    }
+
    
    public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws Throwable
    {
       startServers(0);
 
       ServerLocator locator = createHAServerLocator();
-      
-      ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
 
-      final List<String> nodes = new ArrayList<String>();
-      final CountDownLatch upLatch = new CountDownLatch(5);
-      final CountDownLatch downLatch = new CountDownLatch(4);
+      try
+      {
+         ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
 
-      locator.addClusterTopologyListener(new ClusterTopologyListener()
-      {
-         public void nodeUP(final long uniqueEventID, 
-                            String nodeID,
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
+         final List<String> nodes = new ArrayList<String>();
+         final CountDownLatch upLatch = new CountDownLatch(5);
+         final CountDownLatch downLatch = new CountDownLatch(4);
+
+         locator.addClusterTopologyListener(new ClusterTopologyListener()
          {
-            if(!nodes.contains(nodeID))
+            public void nodeUP(final long uniqueEventID,
+                               String nodeID,
+                               Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                               boolean last)
             {
-               System.out.println("Node UP " + nodeID + " added");
-               log.info("Node UP " + nodeID + " added");
-               nodes.add(nodeID);
-               upLatch.countDown();
+               if (!nodes.contains(nodeID))
+               {
+                  System.out.println("Node UP " + nodeID + " added");
+                  log.info("Node UP " + nodeID + " added");
+                  nodes.add(nodeID);
+                  upLatch.countDown();
+               }
+               else
+               {
+                  System.out.println("Node UP " + nodeID + " was already here");
+                  log.info("Node UP " + nodeID + " was already here");
+               }
             }
-            else
-            {
-               System.out.println("Node UP " + nodeID + " was already here");
-               log.info("Node UP " + nodeID + " was already here");
-            }
-         }
 
-         public void nodeDown(final long uniqueEventID, String nodeID)
-         {
-            if (nodes.contains(nodeID))
+            public void nodeDown(final long uniqueEventID, String nodeID)
             {
-               log.info("Node down " + nodeID + " accepted");
-               System.out.println("Node down " + nodeID + " accepted");
-               nodes.remove(nodeID);
-               downLatch.countDown();
+               if (nodes.contains(nodeID))
+               {
+                  log.info("Node down " + nodeID + " accepted");
+                  System.out.println("Node down " + nodeID + " accepted");
+                  nodes.remove(nodeID);
+                  downLatch.countDown();
+               }
+               else
+               {
+                  log.info("Node down " + nodeID + " already removed");
+                  System.out.println("Node down " + nodeID + " already removed");
+               }
             }
-            else
-            {
-               log.info("Node down " + nodeID + " already removed");
-               System.out.println("Node down " + nodeID + " already removed");
-            }
-         }
-      });
+         });
 
-      ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
-      startServers(1, 4, 3, 2);
-      String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+         startServers(1, 4, 3, 2);
+         String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
 
-      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
-      checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+         assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+         checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
 
-      waitForClusterConnections(0, 4);
-      waitForClusterConnections(1, 4);
-      waitForClusterConnections(2, 4);
-      waitForClusterConnections(3, 4);
-      waitForClusterConnections(4, 4);
+         waitForClusterConnections(0, 4);
+         waitForClusterConnections(1, 4);
+         waitForClusterConnections(2, 4);
+         waitForClusterConnections(3, 4);
+         waitForClusterConnections(4, 4);
 
-      stopServers(2, 3, 1, 4);
+         stopServers(2, 3, 1, 4);
 
-      assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
-      checkContains(new int[] { 0 }, nodeIDs, nodes);
+         assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+         checkContains(new int[] { 0 }, nodeIDs, nodes);
 
-      sf.close();
-      
-      locator.close();
-      
-      stopServers(0);
+         sf.close();
+      }
+      finally
+      {
+         locator.close();
+
+         stopServers(0);
+      }
+
    }
 
    public void testReceiveNotifications() throws Throwable
@@ -273,73 +294,81 @@
 
       ServerLocator locator = createHAServerLocator();
 
-      final List<String> nodes = new ArrayList<String>();
-      final CountDownLatch upLatch = new CountDownLatch(5);
-      final CountDownLatch downLatch = new CountDownLatch(4);
+      try
+      {
 
-      locator.addClusterTopologyListener(new ClusterTopologyListener()
-      {
-         public void nodeUP(final long uniqueEventID, 
-                            String nodeID,
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
+         waitForClusterConnections(0, 4);
+         waitForClusterConnections(1, 4);
+         waitForClusterConnections(2, 4);
+         waitForClusterConnections(3, 4);
+         waitForClusterConnections(4, 4);
+
+         final List<String> nodes = new ArrayList<String>();
+         final CountDownLatch upLatch = new CountDownLatch(5);
+         final CountDownLatch downLatch = new CountDownLatch(4);
+
+         locator.addClusterTopologyListener(new ClusterTopologyListener()
          {
-            if (!nodes.contains(nodeID))
+            public void nodeUP(final long uniqueEventID,
+                               String nodeID,
+                               Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                               boolean last)
             {
-               nodes.add(nodeID);
-               upLatch.countDown();
+               if (!nodes.contains(nodeID))
+               {
+                  nodes.add(nodeID);
+                  upLatch.countDown();
+               }
             }
-         }
 
-         public void nodeDown(final long uniqueEventID, String nodeID)
-         {
-            if (nodes.contains(nodeID))
+            public void nodeDown(final long uniqueEventID, String nodeID)
             {
-               nodes.remove(nodeID);
-               downLatch.countDown();
+               if (nodes.contains(nodeID))
+               {
+                  nodes.remove(nodeID);
+                  downLatch.countDown();
+               }
             }
-         }
-      });
+         });
 
-      ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
-      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
-      checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+         assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+         checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
 
-      ClientSession session = sf.createSession();
-      
-      waitForClusterConnections(0, 4);
-      waitForClusterConnections(1, 4);
-      waitForClusterConnections(2, 4);
-      waitForClusterConnections(3, 4);
-      waitForClusterConnections(4, 4);
+         ClientSession session = sf.createSession();
 
-      stopServers(0);
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+         stopServers(0);
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
 
-      stopServers(2);
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+         stopServers(2);
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
 
-      stopServers(4);
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+         stopServers(4);
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
 
-      stopServers(3);
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1 }, nodeIDs, nodes);
+         stopServers(3);
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1 }, nodeIDs, nodes);
 
-      stopServers(1);
-      
-      assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
-      checkContains(new int[] {}, nodeIDs, nodes);
+         stopServers(1);
 
-      sf.close();
-      
-      locator.close();
+         assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+         checkContains(new int[] {}, nodeIDs, nodes);
+
+         sf.close();
+      }
+      finally
+      {
+         locator.close();
+      }
+
    }
 
+
    public void testStopNodes() throws Throwable
    {
       startServers(0, 1, 2, 3, 4);
@@ -347,80 +376,87 @@
 
       ServerLocator locator = createHAServerLocator();
 
-      final List<String> nodes = new ArrayList<String>();
-      final CountDownLatch upLatch = new CountDownLatch(5);
+      try
+      {
+         waitForClusterConnections(0, 4);
+         waitForClusterConnections(1, 4);
+         waitForClusterConnections(2, 4);
+         waitForClusterConnections(3, 4);
+         waitForClusterConnections(4, 4);
 
-      locator.addClusterTopologyListener(new ClusterTopologyListener()
-      {
-         public void nodeUP(final long uniqueEventID, String nodeID,
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
+         final List<String> nodes = new ArrayList<String>();
+         final CountDownLatch upLatch = new CountDownLatch(5);
+
+         locator.addClusterTopologyListener(new ClusterTopologyListener()
          {
-            if (!nodes.contains(nodeID))
+            public void nodeUP(final long uniqueEventID,
+                               String nodeID,
+                               Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                               boolean last)
             {
-               nodes.add(nodeID);
-               upLatch.countDown();
+               if (!nodes.contains(nodeID))
+               {
+                  nodes.add(nodeID);
+                  upLatch.countDown();
+               }
             }
-         }
 
-         public void nodeDown(final long uniqueEventID, String nodeID)
-         {
-            if (nodes.contains(nodeID))
+            public void nodeDown(final long uniqueEventID, String nodeID)
             {
-               nodes.remove(nodeID);
+               if (nodes.contains(nodeID))
+               {
+                  nodes.remove(nodeID);
+               }
             }
-         }
-      });
+         });
 
-      ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSessionFactory sf = locator.createSessionFactory();
 
-      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
-      checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+         assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+         checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
 
-      waitForClusterConnections(0, 4);
-      waitForClusterConnections(1, 4);
-      waitForClusterConnections(2, 4);
-      waitForClusterConnections(3, 4);
-      waitForClusterConnections(4, 4);
+         ClientSession session = sf.createSession();
 
-      ClientSession session = sf.createSession();
-      
-      stopServers(0);
-      assertFalse(servers[0].isStarted());
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+         stopServers(0);
+         assertFalse(servers[0].isStarted());
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
 
-      stopServers(2);
-      assertFalse(servers[2].isStarted());
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+         stopServers(2);
+         assertFalse(servers[2].isStarted());
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
 
-      stopServers(4);
-      assertFalse(servers[4].isStarted());
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+         stopServers(4);
+         assertFalse(servers[4].isStarted());
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
 
-      stopServers(3);
-      assertFalse(servers[3].isStarted());
+         stopServers(3);
+         assertFalse(servers[3].isStarted());
 
-      session = checkSessionOrReconnect(session, locator);
-      checkContains(new int[] { 1 }, nodeIDs, nodes);
+         session = checkSessionOrReconnect(session, locator);
+         checkContains(new int[] { 1 }, nodeIDs, nodes);
 
-      stopServers(1);
-      assertFalse(servers[1].isStarted());
-      try
+         stopServers(1);
+         assertFalse(servers[1].isStarted());
+         try
+         {
+            session = checkSessionOrReconnect(session, locator);
+            fail();
+         }
+         catch (Exception e)
+         {
+
+         }
+      }
+      finally
       {
-         session = checkSessionOrReconnect(session, locator);
-         fail();
+         locator.close();
       }
-      catch (Exception e)
-      {
 
-      }
-      
-      locator.close();
    }
-   
+
    public void testMultipleClientSessionFactories() throws Throwable
    {
       startServers(0, 1, 2, 3, 4);
@@ -428,67 +464,75 @@
 
       ServerLocator locator = createHAServerLocator();
 
-      final List<String> nodes = new ArrayList<String>();
-      final CountDownLatch upLatch = new CountDownLatch(5);
-      final CountDownLatch downLatch = new CountDownLatch(4);
+      try
+      {
 
-      locator.addClusterTopologyListener(new ClusterTopologyListener()
-      {
-         public void nodeUP(final long uniqueEventID, String nodeID, 
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
+         waitForClusterConnections(0, 4);
+         waitForClusterConnections(1, 4);
+         waitForClusterConnections(2, 4);
+         waitForClusterConnections(3, 4);
+         waitForClusterConnections(4, 4);
+
+         final List<String> nodes = new ArrayList<String>();
+         final CountDownLatch upLatch = new CountDownLatch(5);
+         final CountDownLatch downLatch = new CountDownLatch(4);
+
+         locator.addClusterTopologyListener(new ClusterTopologyListener()
          {
-            if (!nodes.contains(nodeID))
+            public void nodeUP(final long uniqueEventID,
+                               String nodeID,
+                               Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                               boolean last)
             {
-               nodes.add(nodeID);
-               upLatch.countDown();
+               if (!nodes.contains(nodeID))
+               {
+                  nodes.add(nodeID);
+                  upLatch.countDown();
+               }
             }
-         }
 
-         public void nodeDown(final long uniqueEventID, String nodeID)
-         {
-            if (nodes.contains(nodeID))
+            public void nodeDown(final long uniqueEventID, String nodeID)
             {
-               nodes.remove(nodeID);
-               downLatch.countDown();
+               if (nodes.contains(nodeID))
+               {
+                  nodes.remove(nodeID);
+                  downLatch.countDown();
+               }
             }
-         }
-      });
+         });
 
-      ClientSessionFactory[] sfs = new ClientSessionFactory[] {
-                                                               locator.createSessionFactory(),
-                                                               locator.createSessionFactory(),
-                                                               locator.createSessionFactory(),
-                                                               locator.createSessionFactory(),
-                                                               locator.createSessionFactory() };
-      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
-      checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+         ClientSessionFactory[] sfs = new ClientSessionFactory[] { locator.createSessionFactory(),
+                                                                  locator.createSessionFactory(),
+                                                                  locator.createSessionFactory(),
+                                                                  locator.createSessionFactory(),
+                                                                  locator.createSessionFactory() };
+         assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+         checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
 
-      waitForClusterConnections(0, 4);
-      waitForClusterConnections(1, 4);
-      waitForClusterConnections(2, 4);
-      waitForClusterConnections(3, 4);
-      waitForClusterConnections(4, 4);
-      //we cant close all of the servers, we need to leave one up to notify us
-      stopServers(4, 2, 3, 1);
+         // we cant close all of the servers, we need to leave one up to notify us
+         stopServers(4, 2, 3, 1);
 
-      boolean ok = downLatch.await(10, SECONDS);
-      if(!ok)
-      {
-         System.out.println("TopologyClusterTestBase.testMultipleClientSessionFactories");
+         boolean ok = downLatch.await(10, SECONDS);
+         if (!ok)
+         {
+            log.warn("TopologyClusterTestBase.testMultipleClientSessionFactories will fail");
+         }
+         assertTrue("Was not notified that all servers are Down", ok);
+         checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+         for (int i = 0; i < sfs.length; i++)
+         {
+            ClientSessionFactory sf = sfs[i];
+            sf.close();
+         }
       }
-      assertTrue("Was not notified that all servers are Down", ok);
-      checkContains(new int[] { 0 }, nodeIDs, nodes);
-      
-      for (int i = 0; i < sfs.length; i++)
+      finally
       {
-         ClientSessionFactory sf = sfs[i];
-         sf.close();
+         locator.close();
+
+         stopServers(0);
       }
-      
-      locator.close();
 
-      stopServers(0);
    }
 
    // Private -------------------------------------------------------



More information about the hornetq-commits mailing list