[hornetq-commits] JBoss hornetq SVN: r8110 - branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 14 09:42:12 EDT 2009


Author: ataylor
Date: 2009-10-14 09:42:12 -0400 (Wed, 14 Oct 2009)
New Revision: 8110

Modified:
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
new test

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 12:59:50 UTC (rev 8109)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 13:42:12 UTC (rev 8110)
@@ -444,7 +444,6 @@
 
          message.putStringProperty(key, val);
          message.putIntProperty(COUNT_PROP, i);
-         System.out.println("i = " + i);
          producer.send(message);
       }
 
@@ -511,7 +510,6 @@
                                                 int msgEnd,
                                                 int... consumerIDs) throws Exception
    {
-      boolean outOfOrder = false;
       HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
       for (int i = 0; i < consumerIDs.length; i++)
       {
@@ -546,6 +544,7 @@
             }
 
             SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+               System.out.println("received " + id + " on consumer " + i);
             if(groupIdsReceived.get(id) == null)
             {
                groupIdsReceived.put(id, i);

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 12:59:50 UTC (rev 8109)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 13:42:12 UTC (rev 8110)
@@ -21,6 +21,9 @@
 import org.hornetq.core.management.Notification;
 import org.hornetq.utils.SimpleString;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
@@ -128,7 +131,7 @@
 
             public void onNotification(Notification notification)
             {
-               
+
             }
          }, 0);
          setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
@@ -421,7 +424,6 @@
    }
 
 
-
    public void testGroupingRoundRobin() throws Exception
    {
       setupServer(0, isFileStorage(), isNetty());
@@ -612,8 +614,8 @@
 
          addConsumer(1, 1, "queue0", null);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
-         
 
+
          verifyReceiveAllInRange(10, 20, 1);
 
          System.out.println("*****************************************************************************");
@@ -629,77 +631,77 @@
    }
 
    public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws Exception
-      {
-         setupServer(0, isFileStorage(), isNetty());
-         setupServer(1, isFileStorage(), isNetty());
-         setupServer(2, isFileStorage(), isNetty());
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
 
-         setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
 
-         setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
 
-         setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
-         startServers(0, 1, 2);
+      startServers(0, 1, 2);
 
-         try
-         {
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+      try
+      {
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
-            setupSessionFactory(0, isNetty());
-            setupSessionFactory(1, isNetty());
-            setupSessionFactory(2, isNetty());
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
 
-            createQueue(0, "queues.testaddress", "queue0", null, true);
-            createQueue(1, "queues.testaddress", "queue0", null, true);
-            createQueue(2, "queues.testaddress", "queue0", null, true);
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+         createQueue(2, "queues.testaddress", "queue0", null, true);
 
-            addConsumer(0, 1, "queue0", null);
+         addConsumer(0, 1, "queue0", null);
 
-            waitForBindings(0, "queues.testaddress", 1, 0, true);
-            waitForBindings(1, "queues.testaddress", 1, 1, true);
-            waitForBindings(2, "queues.testaddress", 1, 0, true);
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
 
-            waitForBindings(0, "queues.testaddress", 2, 1, false);
-            waitForBindings(1, "queues.testaddress", 2, 0, false);
-            waitForBindings(2, "queues.testaddress", 2, 1, false);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
 
-            sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-            verifyReceiveAllInRange(true, 0, 10, 0);
+         verifyReceiveAllInRange(true, 0, 10, 0);
 
-            closeAllConsumers();
+         closeAllConsumers();
 
-            sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
-            sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-            stopServers(1);
+         stopServers(1);
 
-            startServers(1);
+         startServers(1);
 
-            addConsumer(1, 1, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
 
-            waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
 
 
-            verifyReceiveAllInRange(10, 30, 1);
+         verifyReceiveAllInRange(10, 30, 1);
 
 
-            System.out.println("*****************************************************************************");
-         }
-         finally
-         {
-            //closeAllConsumers();
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         //closeAllConsumers();
 
-            closeAllSessionFactories();
+         closeAllSessionFactories();
 
-            stopServers(0, 1, 2);
-         }
+         stopServers(0, 1, 2);
       }
+   }
 
 
    public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws Exception
@@ -747,7 +749,6 @@
          stopServers(1);
 
 
-
          startServers(1);
 
          sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
@@ -822,8 +823,6 @@
          sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
-         
-
          startServers(0);
 
          waitForBindings(0, "queues.testaddress", 1, 0, true);
@@ -845,70 +844,137 @@
    }
 
    public void testGroupingMultipleQueuesOnAddress() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
       {
-         setupServer(0, isFileStorage(), isNetty());
-         setupServer(1, isFileStorage(), isNetty());
-         setupServer(2, isFileStorage(), isNetty());
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
-         setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
 
-         setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
 
-         setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+         createQueue(0, "queues.testaddress", "queue1", null, false);
+         createQueue(1, "queues.testaddress", "queue1", null, false);
+         createQueue(2, "queues.testaddress", "queue1", null, false);
 
-         startServers(0, 1, 2);
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
 
-         try
-         {
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
-            setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+         addConsumer(3, 0, "queue0", null);
+         addConsumer(4, 1, "queue0", null);
+         addConsumer(5, 2, "queue0", null);
 
-            setupSessionFactory(0, isNetty());
-            setupSessionFactory(1, isNetty());
-            setupSessionFactory(2, isNetty());
+         waitForBindings(0, "queues.testaddress", 2, 2, true);
+         waitForBindings(1, "queues.testaddress", 2, 2, true);
+         waitForBindings(2, "queues.testaddress", 2, 2, true);
 
-            createQueue(0, "queues.testaddress", "queue0", null, false);
-            createQueue(1, "queues.testaddress", "queue0", null, false);
-            createQueue(2, "queues.testaddress", "queue0", null, false);
+         waitForBindings(0, "queues.testaddress", 4, 4, false);
+         waitForBindings(1, "queues.testaddress", 4, 4, false);
+         waitForBindings(2, "queues.testaddress", 4, 4, false);
 
-            createQueue(0, "queues.testaddress", "queue1", null, false);
-            createQueue(1, "queues.testaddress", "queue1", null, false);
-            createQueue(2, "queues.testaddress", "queue1", null, false);
 
-            addConsumer(0, 0, "queue0", null);
-            addConsumer(1, 1, "queue0", null);
-            addConsumer(2, 2, "queue0", null);
-            
-            addConsumer(3, 0, "queue0", null);
-            addConsumer(4, 1, "queue0", null);
-            addConsumer(5, 2, "queue0", null);
+         sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-            waitForBindings(0, "queues.testaddress", 2, 2, true);
-            waitForBindings(1, "queues.testaddress", 2, 2, true);
-            waitForBindings(2, "queues.testaddress", 2, 2, true);
+         verifyReceiveAll(10, 0);
 
-            waitForBindings(0, "queues.testaddress", 4, 4, false);
-            waitForBindings(1, "queues.testaddress", 4, 4, false);
-            waitForBindings(2, "queues.testaddress", 4, 4, false);
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
 
+         closeAllSessionFactories();
 
-            sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         stopServers(0, 1, 2);
+      }
+   }
 
-            verifyReceiveAll(10, 0);
+   public void testGroupingMultipleSending() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
 
-            System.out.println("*****************************************************************************");
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         CountDownLatch latch = new CountDownLatch(1);
+         Thread[] threads = new Thread[9];
+         int range = 0;
+         for(int i = 0 ; i < 9; i++,range+=10)
+         {
+            threads[i] = new Thread(new ThreadSender(range, range+10, 1, new SimpleString("id" + i), latch, i < 8));
          }
-         finally
+         for (Thread thread : threads)
          {
-            closeAllConsumers();
+            thread.start();
+         }
 
-            closeAllSessionFactories();
+         verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1, 2);
 
-            stopServers(0, 1, 2);
-         }
+         System.out.println("*****************************************************************************");
       }
+      finally
+      {
+         closeAllConsumers();
 
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+
    public boolean isNetty()
    {
       return true;
@@ -918,5 +984,52 @@
    {
       return true;
    }
+
+   class ThreadSender implements Runnable
+   {
+      private int msgStart;
+      private int msgEnd;
+      private SimpleString id;
+      private CountDownLatch latch;
+      private boolean wait;
+      private int node;
+
+      public ThreadSender(int msgStart, int msgEnd, int node, SimpleString id, CountDownLatch latch, boolean wait)
+      {
+         this.msgStart = msgStart;
+         this.msgEnd = msgEnd;
+         this.node = node;
+         this.id = id;
+         this.latch = latch;
+         this.wait = wait;
+      }
+
+      public void run()
+      {
+         if (wait)
+         {
+            try
+            {
+               latch.await(5, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+         }
+         else
+         {
+            latch.countDown();
+         }
+         try
+         {
+            sendInRange(node, "queues.testaddress", msgStart, msgEnd, false, MessageImpl.HDR_GROUP_ID, id);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
+      }
+   }
 }
 



More information about the hornetq-commits mailing list