[hornetq-commits] JBoss hornetq SVN: r10876 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 21 18:43:06 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-21 18:43:06 -0400 (Tue, 21 Jun 2011)
New Revision: 10876

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
Tweaks

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-21 21:57:15 UTC (rev 10875)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-21 22:43:06 UTC (rev 10876)
@@ -549,7 +549,7 @@
    public void beforeReconnect(final HornetQException exception)
    {
       log.warn(name + "::Connection failed before reconnect ", exception);
-      fail(true);
+      fail(false);
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-21 21:57:15 UTC (rev 10875)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-21 22:43:06 UTC (rev 10876)
@@ -508,7 +508,7 @@
       {
          String ClusterTestBase = "receiving nodeUP for nodeID=";
          log.debug(this + ClusterTestBase + nodeID + 
-                   " connectionPair=" + connectorPair, new Exception ("trace"));
+                   " connectionPair=" + connectorPair);
       }
       // discard notifications about ourselves unless its from our backup
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-21 21:57:15 UTC (rev 10875)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-21 22:43:06 UTC (rev 10876)
@@ -312,7 +312,7 @@
       }
       while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
 
-      String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount + " (expecting " + expectedConsumerCount + ") "+
+      String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount + " (expecting " + expectedBindingCount + ") "+
                    ", totConsumers = " +
                    totConsumers + " (expecting " + expectedConsumerCount + ")" + 
                    ")";
@@ -951,21 +951,32 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ConsumerHolder holder = consumers[consumerIDs[count]];
+         // We may use a negative number in some tests to ignore the consumer, case we know the server is down
+         if (consumerIDs[count] >= 0)
+         {
+            ConsumerHolder holder = consumers[consumerIDs[count]];
+   
+            if (holder == null)
+            {
+               throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+            }
+   
+            ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
+            
+            message.acknowledge();
+            
+            consumers[consumerIDs[count]].session.commit();
+            
+            System.out.println("Msg: " + message);
+   
+            Assert.assertNotNull("consumer " + consumerIDs[count] + " did not receive message " + i, message);
+   
+            Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
+                                i,
+                                message.getObjectProperty(ClusterTestBase.COUNT_PROP));
 
-         if (holder == null)
-         {
-            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
          }
-
-         ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-         Assert.assertNotNull("consumer " + consumerIDs[count] + " did not receive message " + i, message);
-
-         Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
-                             i,
-                             message.getObjectProperty(ClusterTestBase.COUNT_PROP));
-
+         
          count++;
 
          if (count == consumerIDs.length)

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-21 21:57:15 UTC (rev 10875)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-21 22:43:06 UTC (rev 10876)
@@ -140,7 +140,7 @@
       waitForBindings(2, "queues.testaddress", 2, 2, false);
 
    }
-
+   
    public void testSimpleRoundRobbin() throws Exception
    {
       setupServer(0, true, isNetty());
@@ -187,8 +187,11 @@
 
       stopServers(2);
 
-      Thread.sleep(5000);
 
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+
       send(0, "queues.testaddress", 100, true, null);
 
       verifyReceiveRoundRobin(100, 0, 1);
@@ -220,6 +223,90 @@
 
    }
 
+   
+   public void testSimpleRoundRobbinNoFailure() throws Exception
+   {
+      setupServer(0, true, isNetty());
+      setupServer(1, true, isNetty());
+      setupServer(2, true, isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, -1, 1000, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster1", "queues", false, 1, -1, 1000, isNetty(), 1, 2, 0);
+      setupClusterConnection("cluster1", "queues", false, 1, -1, 1000, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      // Thread.sleep(1500);
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+
+      addConsumer(0, 0, "queue0", null);
+      // Thread.sleep(1500);
+      addConsumer(1, 1, "queue0", null);
+      // Thread.sleep(1500);
+      addConsumer(2, 2, "queue0", null);
+      // Thread.sleep(1500);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 33, true, null);
+
+      verifyReceiveRoundRobin(33, 0, 1, 2);
+
+      Thread.sleep(1000);
+      
+      // TODO: need to make sure the shutdown won't be send, what will affect the test
+      stopServers(2);
+//      
+//      Thread.sleep(5000);
+//
+//      waitForBindings(0, "queues.testaddress", 2, 2, false);
+//      waitForBindings(1, "queues.testaddress", 2, 2, false);
+
+
+      send(0, "queues.testaddress", 100, true, null);
+      
+      verifyReceiveRoundRobin(100, 0, 1, -1);
+      
+      sfs[2] = null;
+      consumers[2] = null;
+      
+      startServers(2);
+      
+      setupSessionFactory(2, isNetty());
+
+      addConsumer(2, 2, "queue0", null);
+      
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      verifyReceiveRoundRobin(100, -1, -1, 2);
+
+      
+      
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list