[hornetq-commits] JBoss hornetq SVN: r10343 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 18 00:54:46 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-18 00:54:45 -0400 (Fri, 18 Mar 2011)
New Revision: 10343

Modified:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6130 Adding failing test

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-18 00:11:19 UTC (rev 10342)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-18 04:54:45 UTC (rev 10343)
@@ -175,7 +175,7 @@
 
    private static final int MAX_SERVERS = 10;
 
-   private ConsumerHolder[] consumers;
+   protected ConsumerHolder[] consumers;
 
    protected HornetQServer[] servers;
 
@@ -1113,6 +1113,11 @@
 
    protected int[] getReceivedOrder(final int consumerID) throws Exception
    {
+      return getReceivedOrder(consumerID, false);
+   }
+
+   protected int[] getReceivedOrder(final int consumerID, final boolean ack) throws Exception
+   {
       ConsumerHolder consumer = consumers[consumerID];
 
       if (consumer == null)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-18 00:11:19 UTC (rev 10342)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-18 04:54:45 UTC (rev 10343)
@@ -14,6 +14,9 @@
 package org.hornetq.tests.integration.cluster.distribution;
 
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 
@@ -104,7 +107,96 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
+   // https://issues.jboss.org/browse/JBPAPP-6130
+   // https://issues.jboss.org/browse/HORNETQ-654
+   public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
+   {
+      setupCluster(false);
 
+      MessageRedistributionTest.log.info("Doing test");
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 9, true, null);
+
+      getReceivedOrder(0, true);
+      int[] ids1 = getReceivedOrder(1, false);
+      getReceivedOrder(2, true);
+
+      for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
+      {
+         ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
+         for (MessageFlowRecord record : impl.getRecords().values())
+         {
+            if (record.getBridge() != null)
+            {
+               System.out.println("stop record bridge");
+               record.getBridge().stop();
+            }
+         }
+      }
+
+      removeConsumer(1);
+
+      for (int i = 0; i <= 2; i++)
+      {
+         servers[i].stop();
+         servers[i] = null;
+      }
+      
+      setupServers();
+      
+      setupCluster(false);
+      
+      startServers(0, 1, 2);
+      
+      for (int i = 0 ; i <= 2; i++)
+      {
+         consumers[i] = null;
+         sfs[i] = null;
+      }
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+
+      MessageRedistributionTest.log.info("Test done");
+   }
+
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);
@@ -494,17 +586,16 @@
          removeConsumer(1);
 
          addConsumer(0, 0, QUEUE, null);
-         
+
          waitForMessages(1, ADDRESS, 0);
          waitForMessages(0, ADDRESS, 20);
-         
+
          removeConsumer(0);
          addConsumer(1, 1, QUEUE, null);
-         
+
          waitForMessages(1, ADDRESS, 20);
          waitForMessages(0, ADDRESS, 0);
-         
-         
+
          verifyReceiveAll(20, 1);
 
          stop();
@@ -663,7 +754,7 @@
 
       verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
    }
-   
+
    /*
     * Start one node with no consumers and send some messages
     * Start another node add a consumer and verify all messages are redistribute
@@ -676,26 +767,26 @@
       startServers(0);
 
       setupSessionFactory(0, isNetty());
-      
+
       createQueue(0, "queues.testaddress", "queue0", null, false);
-      
+
       waitForBindings(0, "queues.testaddress", 1, 0, true);
-      
+
       send(0, "queues.testaddress", 20, false, null);
-      
-      //Now bring up node 1
-      
+
+      // Now bring up node 1
+
       startServers(1);
 
       setupSessionFactory(1, isNetty());
 
       createQueue(1, "queues.testaddress", "queue0", null, false);
-      
+
       waitForBindings(1, "queues.testaddress", 1, 0, true);
       waitForBindings(0, "queues.testaddress", 1, 0, false);
-      
+
       addConsumer(0, 1, "queue0", null);
-      
+
       verifyReceiveAll(20, 0);
       verifyNotReceive(0);
    }



More information about the hornetq-commits mailing list