[hornetq-commits] JBoss hornetq SVN: r9611 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/management/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 30 10:18:39 EDT 2010


Author: jmesnil
Date: 2010-08-30 10:18:38 -0400 (Mon, 30 Aug 2010)
New Revision: 9611

Added:
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
add tests to check topology updates on HA clients

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -579,6 +579,11 @@
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
                            "Cannot connect to server(s). Tried with all available servers.");
                   }
+                  if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                     "Cannot connect to server(s). Tried with all available servers.");
+                  }
 
                   retry = true;
                }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -143,7 +143,14 @@
       clearIO();
       try
       {
+         if (configuration.getStaticConnectors() == null)
+         {
+            return null;
+         }
+         else
+         {
          return configuration.getStaticConnectors().toArray(new String[0]);                 
+         }
       }
       finally
       {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -68,7 +68,8 @@
                                                                    interceptors,
                                                                    config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
                                                                                                                       .getExecutor()
-                                                                                                             : null);
+                                                                                                             : null,
+                                                                                                             server.getNodeID());
 
       Channel channel1 = rc.getChannel(1, -1);
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -25,10 +25,12 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -88,6 +90,8 @@
    private final Executor executor;
    
    private volatile boolean executing;
+   
+   private final SimpleString nodeID;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -99,7 +103,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, blockingCallTimeout, interceptors, true, null);
+      this(transportConnection, blockingCallTimeout, interceptors, true, null, null);
    }
 
    /*
@@ -107,17 +111,19 @@
     */
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final List<Interceptor> interceptors,
-                                 final Executor executor)
+                                 final Executor executor,
+                                 final SimpleString nodeID)
 
    {
-      this(transportConnection, -1, interceptors, false, executor);
+      this(transportConnection, -1, interceptors, false, executor, nodeID);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean client,
-                                  final Executor executor)
+                                  final Executor executor,
+                                  final SimpleString nodeID)
 
    {
       this.transportConnection = transportConnection;
@@ -129,6 +135,8 @@
       this.client = client;
 
       this.executor = executor;
+      
+      this.nodeID = nodeID;
    }
 
    // RemotingConnection implementation
@@ -299,6 +307,10 @@
          channel.flushConfirmations();
       }
 
+      if (nodeID != null)
+      {
+         channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
+      }
       channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
    }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -227,53 +227,4 @@
       
       return serverLocator.createSessionFactory(connector);      
    }
-
-   @Override
-   public void connectionFailed(HornetQException me)
-   {
-      if (!session.isClosed())
-      {
-         try
-         {
-            session.cleanUp(false);
-         }
-         catch (Exception e)
-         {
-            log.warn("Unable to clean up the session after a connection failure", e);
-         }
-         serverLocator.notifyNodeDown(targetNodeID);
-         if (serverLocator.getDiscoveryAddress() == null)
-         {
-            executor.execute(new Runnable()
-            {
-               
-               public void run()
-               {
-                  ClientSessionFactory sf = null;
-                  do
-                  {
-                     try
-                     {
-                        sf = serverLocator.createSessionFactory(connector);
-                     }
-                     catch (HornetQException e)
-                     {
-                        if (e.getCode() == HornetQException.NOT_CONNECTED)
-                        {
-                           continue;
-                        }
-                     }
-                     catch (Exception e)
-                     {
-                        break;
-                     }
-                  }
-                  while (sf == null);
-               }
-            });
-         }
-      }
-      super.connectionFailed(me);
-   }
-
 }

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyTest extends TopologyClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(HAClientTopologyTest.class);
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+      setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+      setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+   }
+
+   protected void setupServers() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+      setupServer(3, isFileStorage(), isNetty());
+      setupServer(4, isFileStorage(), isNetty());
+   }
+
+   @Override
+   protected ServerLocator createHAServerLocator()
+   {
+      TransportConfiguration tc = createTransportConfiguration(isNetty(), false, generateParams(0, isNetty()));
+      ServerLocator locator = HornetQClient.createServerLocatorWithHA(tc);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      return locator;
+   }
+}

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyWithDiscoveryTest extends TopologyClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(HAClientTopologyWithDiscoveryTest.class);
+
+   protected static final String groupAddress = "230.1.2.3";
+
+   protected static final int groupPort = 6745;
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupDiscoveryClusterConnection("cluster3", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+      setupDiscoveryClusterConnection("cluster4", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+   }
+
+   protected void setupServers() throws Exception
+   {
+      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+   }
+
+   @Override
+   protected ServerLocator createHAServerLocator()
+   {
+      ServerLocator locator = HornetQClient.createServerLocatorWithHA(groupAddress, groupPort);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      return locator;
+   }
+}

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyTest extends HAClientTopologyTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyWithDiscoveryTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWithDiscoveryTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A TopologyClusterTestBase
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public abstract class TopologyClusterTestBase extends ClusterTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
+
+   private static final long WAIT_TIMEOUT = 30000;
+
+   abstract protected ServerLocator createHAServerLocator();
+
+   abstract protected void setupServers() throws Exception;
+
+   abstract protected void setupCluster() throws Exception;
+
+   abstract protected boolean isNetty() throws Exception;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      setupServers();
+
+      setupCluster();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers(0, 1, 2, 3, 4);
+
+      super.tearDown();
+   }
+
+   /**
+    * Check that the actual list of received nodeIDs correspond to the expected order of nodes
+    */
+   protected void checkOrder(int[] expected, String[] nodeIDs, List<String> actual)
+   {
+      assertEquals(expected.length, actual.size());
+      for (int i = 0; i < expected.length; i++)
+      {
+         assertEquals("did not receive expected nodeID at " + i, nodeIDs[expected[i]], actual.get(i));
+      }
+   }
+
+   protected void checkContains(int[] expected, String[] nodeIDs, List<String> actual)
+   {
+      long start = System.currentTimeMillis();
+      do
+      {
+         if (expected.length != actual.size())
+         {
+            continue;
+         }
+         boolean ok = true;
+         for (int i = 0; i < expected.length; i++)
+         {
+            ok = (ok && actual.contains(nodeIDs[expected[i]]));
+         }
+         if (ok) 
+         {
+            return;
+         }
+      } while(System.currentTimeMillis() - start < 5000);
+      fail("did not contain all expected node ID: " + actual);
+   }
+
+   protected String[] getNodeIDs(int... nodes)
+   {
+      String[] nodeIDs = new String[nodes.length];
+      for (int i = 0; i < nodes.length; i++)
+      {
+         nodeIDs[i] = servers[i].getNodeID().toString();
+      }
+      return nodeIDs;
+   }
+
+   protected ClientSession checkSessionOrReconnect(ClientSession session, ServerLocator locator) throws Exception
+   {
+      try
+      {
+         String rand = RandomUtil.randomString();
+         session.createQueue(rand, rand);
+         session.deleteQueue(rand);
+         return session;
+      }
+      catch (HornetQException e)
+      {
+         if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() == HornetQException.UNBLOCKED)
+         {
+         ClientSessionFactory sf = locator.createSessionFactory();
+         return sf.createSession();
+         }
+         else
+         {
+            throw e;
+         }
+      }
+   }
+
+   protected void waitForClusterConnections(final int node, final int count) throws Exception
+   {
+      HornetQServer server = servers[node];
+
+      if (server == null)
+      {
+         throw new IllegalArgumentException("No server at " + node);
+      }
+
+      ClusterManager clusterManager = server.getClusterManager();
+
+      long start = System.currentTimeMillis();
+
+      do
+      {
+         int nodesCount = 0;
+
+         for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
+         {
+            nodesCount += clusterConn.getNodes().size();
+         }
+
+         if (nodesCount == count)
+         {
+            return;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+ 
+      log.error(clusterDescription(servers[node]));
+      throw new IllegalStateException("Timed out waiting for cluster connections ");
+   }
+   
+   public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws Throwable
+   {
+      startServers(0);
+
+      ServerLocator locator = createHAServerLocator();
+
+      final List<String> nodes = new ArrayList<String>();
+      final CountDownLatch upLatch = new CountDownLatch(5);
+      final CountDownLatch downLatch = new CountDownLatch(4);
+
+      locator.addClusterTopologyListener(new ClusterTopologyListener()
+      {
+         public void nodeUP(String nodeID,
+                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            boolean last,
+                            int distance)
+         {
+            if(!nodes.contains(nodeID))
+            {
+               nodes.add(nodeID);
+               upLatch.countDown();
+            }
+         }
+
+         public void nodeDown(String nodeID)
+         {
+            if (nodes.contains(nodeID))
+            {
+               nodes.remove(nodeID);
+               downLatch.countDown();
+            }
+         }
+      });
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      startServers(1, 4, 3, 2);
+      String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+      checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+
+      waitForClusterConnections(0, 4);
+      waitForClusterConnections(1, 4);
+      waitForClusterConnections(2, 4);
+      waitForClusterConnections(3, 4);
+      waitForClusterConnections(4, 4);
+
+      stopServers(2, 3, 1, 4);
+
+      waitForClusterConnections(0, 0);
+
+      assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+      checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+      sf.close();
+      
+      stopServers(0);
+   }
+
+   public void testReceiveNotifications() throws Throwable
+   {
+      startServers(0, 1, 2, 3, 4);
+      String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+      ServerLocator locator = createHAServerLocator();
+
+      final List<String> nodes = new ArrayList<String>();
+      final CountDownLatch upLatch = new CountDownLatch(5);
+      final CountDownLatch downLatch = new CountDownLatch(4);
+
+      locator.addClusterTopologyListener(new ClusterTopologyListener()
+      {
+         public void nodeUP(String nodeID,
+                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            boolean last,
+                            int distance)
+         {
+            if (!nodes.contains(nodeID))
+            {
+               nodes.add(nodeID);
+               upLatch.countDown();
+            }
+         }
+
+         public void nodeDown(String nodeID)
+         {
+            if (nodes.contains(nodeID))
+            {
+               nodes.remove(nodeID);
+               downLatch.countDown();
+            }
+         }
+      });
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+      checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+      ClientSession session = sf.createSession();
+      
+      waitForClusterConnections(0, 4);
+      waitForClusterConnections(1, 4);
+      waitForClusterConnections(2, 4);
+      waitForClusterConnections(3, 4);
+      waitForClusterConnections(4, 4);
+
+      stopServers(0);
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+      stopServers(2);
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+      stopServers(4);
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+      stopServers(3);
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+      stopServers(1);
+      
+      assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+      checkContains(new int[] {}, nodeIDs, nodes);
+
+      sf.close();
+   }
+
+   public void testStopNodes() throws Throwable
+   {
+      startServers(0, 1, 2, 3, 4);
+      String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+      ServerLocator locator = createHAServerLocator();
+
+      final List<String> nodes = new ArrayList<String>();
+      final CountDownLatch upLatch = new CountDownLatch(5);
+
+      locator.addClusterTopologyListener(new ClusterTopologyListener()
+      {
+         public void nodeUP(String nodeID,
+                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            boolean last,
+                            int distance)
+         {
+            if (!nodes.contains(nodeID))
+            {
+               nodes.add(nodeID);
+               upLatch.countDown();
+            }
+         }
+
+         public void nodeDown(String nodeID)
+         {
+            if (nodes.contains(nodeID))
+            {
+               nodes.remove(nodeID);
+            }
+         }
+      });
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+      checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+      waitForClusterConnections(0, 4);
+      waitForClusterConnections(1, 4);
+      waitForClusterConnections(2, 4);
+      waitForClusterConnections(3, 4);
+      waitForClusterConnections(4, 4);
+
+      ClientSession session = sf.createSession();
+      
+      stopServers(0);
+      assertFalse(servers[0].isStarted());
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+      stopServers(2);
+      assertFalse(servers[2].isStarted());
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+      stopServers(4);
+      assertFalse(servers[4].isStarted());
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+      stopServers(3);
+      assertFalse(servers[3].isStarted());
+
+      session = checkSessionOrReconnect(session, locator);
+      checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+      stopServers(1);
+      assertFalse(servers[1].isStarted());
+      try
+      {
+         session = checkSessionOrReconnect(session, locator);
+         fail();
+      }
+      catch (Exception e)
+      {
+
+      }
+   }
+
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list