[hornetq-commits] JBoss hornetq SVN: r10895 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 28 21:07:24 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-28 21:07:24 -0400 (Tue, 28 Jun 2011)
New Revision: 10895

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
tweaks on clustering

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-29 01:07:24 UTC (rev 10895)
@@ -37,6 +37,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -84,7 +85,7 @@
                                        TransportConstants.DEFAULT_PORT + 8,
                                        TransportConstants.DEFAULT_PORT + 9, };
 
-   private static final long WAIT_TIMEOUT = 5000;
+   private static final long WAIT_TIMEOUT = 10000;
 
    @Override
    protected void setUp() throws Exception
@@ -243,7 +244,37 @@
 
       throw new IllegalStateException(msg);
    }
+   
+   protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
+   {
+      log.debug("waiting for " + nodes + " on the topology for server = " + server);
 
+
+      long start = System.currentTimeMillis();
+      
+      Topology topology = server.getClusterManager().getTopology();
+
+      do
+      {
+         if (nodes == topology.getMembers().size())
+         {
+            return;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
+      
+      String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") nodes on server = " + server + ")\n Current topology:" + topology.describe();
+
+      ClusterTestBase.log.error(msg);
+      
+      throw new Exception (msg);
+
+
+      
+   }
+
    protected void waitForBindings(final int node,
                                   final String address,
                                   final int expectedBindingCount,
@@ -287,14 +318,6 @@
          {
             if (binding instanceof LocalQueueBinding && local || binding instanceof RemoteQueueBinding && !local)
             {
-               if (local)
-               {
-                  log.debug("found binding " + binding +  " on node " + server);
-               }
-               else
-               {
-                  log.debug("found remote binding " + binding + " on node " + server);
-               }
                QueueBinding qBinding = (QueueBinding)binding;
 
                bindingCount++;
@@ -1995,7 +2018,7 @@
    {
       for (int node : nodes)
       {
-         if (servers[node].isStarted())
+         if (servers[node] != null && servers[node].isStarted())
          {
             try
             {

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-29 01:07:24 UTC (rev 10895)
@@ -18,7 +18,6 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQServer;
 
 /**
  * A SimpleSymmetricClusterTest
@@ -78,7 +77,8 @@
 
    public void tearDown() throws Exception
    {
-      stopServers(0, 1, 2);
+      log.info("#test tearDown " + loopNumber);
+      stopServers(0, 1, 2, 3, 4);
       super.tearDown();
    }
 
@@ -114,8 +114,6 @@
 
       // startServers(3, 4, 5, 0, 1, 2);
       startServers(0, 1, 2, 3, 4, 5);
-      
-      Thread.sleep(1000);
 
       log.info("");
       for (int i = 0; i <= 5; i++)
@@ -148,7 +146,7 @@
 
    }
    
-
+   
    public void testSimple() throws Exception
    {
       setupServer(0, true, isNetty());
@@ -157,12 +155,10 @@
 
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 2, 0);
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
       startServers(0, 1, 2);
 
-      Thread.sleep(1000);
-
       for (int i = 0; i < 10; i++)
          log.info("****************************");
       for (int i = 0; i <= 2; i++)
@@ -202,7 +198,84 @@
       waitForBindings(2, "queues.testaddress", 2, 2, false);
 
    }
+   static int loopNumber;
+   public void _testLoop() throws Throwable
+   {
+      for (int i = 0 ; i < 1000; i++)
+      {
+         loopNumber = i;
+         log.info("#test " + i);
+         testSimple2();
+         if (i + 1  < 1000)
+         {
+            tearDown();
+            setUp();
+         }
+      }
+   }
+
+
    
+
+   public void testSimple2() throws Exception
+   {
+      setupServer(0, true, isNetty());
+      setupServer(1, true, isNetty());
+      setupServer(2, true, isNetty());
+      setupServer(3, true, isNetty());
+      setupServer(4, true, isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2, 3, 4);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2, 3, 4);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1, 3, 4);
+
+      setupClusterConnection("cluster3", "queues", false, 1, isNetty(), 3, 0, 1, 2, 4);
+
+      setupClusterConnection("cluster4", "queues", false, 1, isNetty(), 4, 0, 1, 2, 3);
+
+      startServers(0, 1, 2, 3, 4);
+      
+      for (int i = 0 ; i <= 4; i++)
+      {
+         waitForTopology(servers[i], 5);
+      }
+      
+      log.info("All the servers have been started already!");
+
+      for (int i = 0; i <= 4; i++)
+      {
+         log.info("*************************************\n " + servers[i] +
+                  " topology:\n" +
+                  servers[i].getClusterManager().getTopology().describe());
+      }
+      
+      for (int i = 0; i <= 4; i++)
+      {
+         setupSessionFactory(i, isNetty());
+      }
+
+      for (int i = 0 ; i <= 4; i++)
+      {
+         createQueue(i, "queues.testaddress", "queue0", null, false);
+      }
+
+      for (int i = 0 ; i <= 4; i++)
+      {
+         addConsumer(i, i, "queue0", null);
+      }
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
+      waitForBindings(1, "queues.testaddress", 4, 4, false);
+      waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+   }
+   
    public void testSimpleRoundRobbin() throws Exception
    {
       setupServer(0, true, isNetty());

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-06-29 01:07:24 UTC (rev 10895)
@@ -42,6 +42,7 @@
    @Override
    protected void tearDown() throws Exception
    {
+      log.info("#test tearDown");
       stopServers();
 
       super.tearDown();
@@ -51,6 +52,20 @@
    {
       return false;
    }
+   
+   public void _testLoop() throws Throwable
+   {
+      for (int i = 0 ; i < 1000; i++)
+      {
+         log.info("#test " + i);
+         testStopAllStartAll();
+         if (i + 1  < 1000)
+         {
+            tearDown();
+            setUp();
+         }
+      }
+   }
 
    public void testStopAllStartAll() throws Throwable
    {



More information about the hornetq-commits mailing list