[hornetq-commits] JBoss hornetq SVN: r8102 - in branches/hornetq_grouping: examples/javaee and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 14 05:13:40 EDT 2009


Author: ataylor
Date: 2009-10-14 05:13:40 -0400 (Wed, 14 Oct 2009)
New Revision: 8102

Modified:
   branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
   branches/hornetq_grouping/hornetq.iml
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
fixes

Modified: branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml	2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml	2009-10-14 09:13:40 UTC (rev 8102)
@@ -1,7 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" inherit-compiler-output="true">
-    <output url="file://$MODULE_DIR$/output/classes" />
     <exclude-output />
     <content url="file://$MODULE_DIR$">
       <sourceFolder url="file://$MODULE_DIR$/ejb-jms-transaction/src" isTestSource="false" />

Modified: branches/hornetq_grouping/hornetq.iml
===================================================================
--- branches/hornetq_grouping/hornetq.iml	2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/hornetq.iml	2009-10-14 09:13:40 UTC (rev 8102)
@@ -1,7 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="true">
-    <output url="file://$MODULE_DIR$/output/classes" />
     <exclude-output />
     <content url="file://$MODULE_DIR$">
       <sourceFolder url="file://$MODULE_DIR$/build/src" isTestSource="false" />

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-14 09:13:40 UTC (rev 8102)
@@ -166,7 +166,7 @@
       {
          bindable.route(message, tx);
       }
-      
+
       return true;
    }
 
@@ -284,7 +284,7 @@
          {
             routed = routeFromCluster(message, tx);
          }
-         else if(groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+         else if (groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
          {
             routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
          }
@@ -305,127 +305,133 @@
                   continue;
                }
 
-               Integer ipos = routingNamePositions.get(routingName);
+               Binding theBinding = getNextBinding(message, routingName, bindings);
 
-               int pos = ipos != null ? ipos.intValue() : 0;
 
-               int length = bindings.size();
-
-               int startPos = pos;
-
-               Binding theBinding = null;
-
-               int lastLowPriorityBinding = -1;
-
-               while (true)
+               if (theBinding != null)
                {
-                  Binding binding;
-                  try
-                  {
-                     binding = bindings.get(pos);
-                  }
-                  catch (IndexOutOfBoundsException e)
-                  {
-                     // This can occur if binding is removed while in route
-                     if (!bindings.isEmpty())
-                     {
-                        pos = 0;
-                        startPos = 0;
-                        length = bindings.size();
+                  theBinding.willRoute(message);
 
-                        continue;
-                     }
-                     else
-                     {
-                        break;
-                     }
-                  }
+                  chosen.add(theBinding.getBindable());
+               }
+            }
 
-            Filter filter = binding.getFilter();
+            // TODO refactor to do this is one iteration
 
-            if (filter == null || filter.match(message))
+            for (Bindable bindable : chosen)
             {
-               // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
-               // unnecessary overhead)
-               if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
-               {
-                  theBinding = binding;
+               bindable.preroute(message, tx);
+            }
 
-                  pos = incrementPos(pos, length);
+            for (Bindable bindable : chosen)
+            {
+               bindable.route(message, tx);
 
-                  break;
-               }
-               else
-               {
-                  if (lastLowPriorityBinding == -1)
-                  {
-                     lastLowPriorityBinding = pos;
-                  }
-               }
+               routed = true;
             }
+         }
+      }
 
-            pos = incrementPos(pos, length);
+      return routed;
+   }
 
-            if (pos == startPos)
-            {
-               if (lastLowPriorityBinding != -1)
-               {
-                  try
-                  {
-                     theBinding = bindings.get(pos);
-                  }
-                  catch (IndexOutOfBoundsException e)
-                  {
-                     // This can occur if binding is removed while in route
-                     if (!bindings.isEmpty())
-                     {
-                        pos = 0;
+   private Binding getNextBinding(ServerMessage message, SimpleString routingName, List<Binding> bindings)
+   {
+      Integer ipos = routingNamePositions.get(routingName);
 
-                        lastLowPriorityBinding = -1;
+      int pos = ipos != null ? ipos : 0;
 
-                        continue;
-                     }
-                     else
-                     {
-                        break;
-                     }
-                  }
+      int length = bindings.size();
 
-                  pos = lastLowPriorityBinding;
+      int startPos = pos;
 
-                  pos = incrementPos(pos, length);
-               }
+      Binding theBinding = null;
+
+      int lastLowPriorityBinding = -1;
+
+      while (true)
+      {
+         Binding binding;
+         try
+         {
+            binding = bindings.get(pos);
+         }
+         catch (IndexOutOfBoundsException e)
+         {
+            // This can occur if binding is removed while in route
+            if (!bindings.isEmpty())
+            {
+               pos = 0;
+               startPos = 0;
+               length = bindings.size();
+
+               continue;
+            }
+            else
+            {
                break;
             }
          }
 
-         if (theBinding != null)
+         Filter filter = binding.getFilter();
+
+         if (filter == null || filter.match(message))
          {
-            theBinding.willRoute(message);
+            // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+            // unnecessary overhead)
+            if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
+            {
+               theBinding = binding;
 
-            chosen.add(theBinding.getBindable());
-         }
+               pos = incrementPos(pos, length);
 
-         routingNamePositions.put(routingName, pos);
+               break;
+            }
+            else
+            {
+               if (lastLowPriorityBinding == -1)
+               {
+                  lastLowPriorityBinding = pos;
+               }
+            }
          }
 
-            // TODO refactor to do this is one iteration
+         pos = incrementPos(pos, length);
 
-            for (Bindable bindable : chosen)
+         if (pos == startPos)
+         {
+            if (lastLowPriorityBinding != -1)
             {
-               bindable.preroute(message, tx);
-            }
+               try
+               {
+                  theBinding = bindings.get(pos);
+               }
+               catch (IndexOutOfBoundsException e)
+               {
+                  // This can occur if binding is removed while in route
+                  if (!bindings.isEmpty())
+                  {
+                     pos = 0;
 
-            for (Bindable bindable : chosen)
-            {               
-               bindable.route(message, tx);
-               
-               routed = true;
+                     lastLowPriorityBinding = -1;
+
+                     continue;
+                  }
+                  else
+                  {
+                     break;
+                  }
+               }
+
+               pos = lastLowPriorityBinding;
+
+               pos = incrementPos(pos, length);
             }
+            break;
          }
       }
-      
-      return routed;
+      routingNamePositions.put(routingName, pos);
+      return theBinding;
    }
 
    private void routeUsingStrictOrdering(ServerMessage message, Transaction tx, GroupingHandler groupingGroupingHandler)
@@ -433,36 +439,27 @@
    {
       SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
       Response resp = groupingGroupingHandler.propose(new Proposal(groupId, null));
-      if(resp == null)
+      if (resp == null)
       {
          for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
          {
             SimpleString routingName = entry.getKey();
 
             List<Binding> bindings = entry.getValue();
-            Binding chosen = null;
-            Binding lowestPriorityBinding = null;
-            int lowestPriority = Integer.MAX_VALUE;
-            for (Binding binding : bindings)
+
+            if (bindings == null)
             {
-               boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
-               int distance = binding.getDistance();
-               if((distance < lowestPriority))
-               {
-                  lowestPriorityBinding = binding;
-                  lowestPriority = distance;
-                  if(bindingIsHighAcceptPriority)
-                  {
-                     chosen = binding;
-                  }
-               }
+               // The value can become null if it's concurrently removed while we're iterating - this is expected
+               // ConcurrentHashMap behaviour!
+               continue;
             }
-            if(chosen == null)
-            {
-               chosen = lowestPriorityBinding;
-            }
+
+
+            Binding chosen = getNextBinding(message, routingName, bindings);
+            
             resp = groupingGroupingHandler.propose(new Proposal(groupId, chosen.getClusterName()));
-            if(!resp.getChosen().equals(chosen.getClusterName()))
+            
+            if (!resp.getChosen().equals(chosen.getClusterName()))
             {
                for (Binding binding : bindings)
                {
@@ -474,7 +471,7 @@
                }
             }
 
-            if( chosen != null )
+            if (chosen != null)
             {
                chosen.willRoute(message);
                chosen.getBindable().preroute(message, tx);
@@ -492,13 +489,13 @@
             Binding chosen = null;
             for (Binding binding : bindings)
             {
-               if(binding.getClusterName().equals(resp.getChosen()))
+               if (binding.getClusterName().equals(resp.getChosen()))
                {
                   chosen = binding;
                   break;
                }
             }
-            if( chosen != null)
+            if (chosen != null)
             {
                chosen.willRoute(message);
                chosen.getBindable().preroute(message, tx);

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-10-14 09:13:40 UTC (rev 8102)
@@ -51,6 +51,7 @@
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.Pair;
@@ -481,6 +482,117 @@
       verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
    }
 
+   protected void verifyReceiveAllWithGroupIDRoundRobin(
+                                                int msgStart,
+                                                int msgEnd,
+                                                int... consumerIDs) throws Exception
+   {
+      verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);   
+   }
+
+   protected int verifyReceiveAllOnSingleConsumer(int msgStart,
+                                                int msgEnd,
+                                                int... consumerIDs) throws Exception
+   {
+      return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);  
+   }
+
+   protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
+                                                long firstReceiveTime,
+                                                int msgStart,
+                                                int msgEnd,
+                                                int... consumerIDs) throws Exception
+   {
+      boolean outOfOrder = false;
+      HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
+      for (int i = 0; i < consumerIDs.length; i++)
+      {
+         ConsumerHolder holder = consumers[consumerIDs[i]];
+
+         if (holder == null)
+         {
+            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+         }
+
+         for (int j = msgStart; j < msgEnd; j++)
+         {
+            ClientMessage message = holder.consumer.receive(2000);
+
+            if (message == null)
+            {
+               log.info("*** dumping consumers:");
+
+               dumpConsumers();
+
+               assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+            }
+
+            if (ack)
+            {
+               message.acknowledge();
+            }
+
+            if (firstReceiveTime != -1)
+            {
+               assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
+            }
+
+            SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+            if(groupIdsReceived.get(id) == null)
+            {
+               groupIdsReceived.put(id, i);
+            }
+            else if (groupIdsReceived.get(id) != i)
+            {
+               fail("consumer " + groupIdsReceived.get(id) + " already bound to groupid " + id + " received on consumer " + i);
+            }
+
+         }
+
+      }
+
+
+   }
+
+   protected int verifyReceiveAllOnSingleConsumer(boolean ack,
+                                                int msgStart,
+                                                int msgEnd,
+                                                int... consumerIDs) throws Exception
+   {
+      int groupIdsReceived = -1;
+      for (int i = 0; i < consumerIDs.length; i++)
+      {
+         ConsumerHolder holder = consumers[consumerIDs[i]];
+
+         if (holder == null)
+         {
+            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+         }
+         ClientMessage message = holder.consumer.receive(2000);
+         if (message != null)
+         {
+            groupIdsReceived = i;
+            for (int j = msgStart + 1; j < msgEnd; j++)
+            {
+               message = holder.consumer.receive(2000);
+
+               if (message == null)
+               {
+                  fail("consumer " + i + " did not receive all messages");
+               }
+
+               if (ack)
+               {
+                  message.acknowledge();
+               }
+            }
+         }
+
+      }
+      return groupIdsReceived;
+
+   }
+
    protected void verifyReceiveAllInRangeNotBefore(boolean ack,
                                                    long firstReceiveTime,
                                                    int msgStart,

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-10-14 09:13:40 UTC (rev 8102)
@@ -228,17 +228,15 @@
          createQueue(1, "queues.testaddress", "queue0", null, false);
          createQueue(2, "queues.testaddress", "queue0", null, false);
 
-         addConsumer(0, 0, "queue0", null);
          addConsumer(1, 1, "queue0", null);
-         addConsumer(2, 2, "queue0", null);
 
-         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
-         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
 
-         waitForBindings(0, "queues.testaddress", 2, 2, false);
-         waitForBindings(1, "queues.testaddress", 2, 2, false);
-         waitForBindings(2, "queues.testaddress", 2, 2, false);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
 
          sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
@@ -324,7 +322,9 @@
       }
    }
 
-   public void testGroupingSendTo3queuesNoConsumersDeliveredToLocalQueue() throws Exception
+
+
+   public void testGroupingRoundRobin() throws Exception
    {
       setupServer(0, isFileStorage(), isNetty());
       setupServer(1, isFileStorage(), isNetty());
@@ -352,26 +352,21 @@
          createQueue(1, "queues.testaddress", "queue0", null, false);
          createQueue(2, "queues.testaddress", "queue0", null, false);
 
-         waitForBindings(0, "queues.testaddress", 1, 0, true);
-         waitForBindings(1, "queues.testaddress", 1, 0, true);
-         waitForBindings(2, "queues.testaddress", 1, 0, true);
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
 
-         waitForBindings(0, "queues.testaddress", 2, 0, false);
-         waitForBindings(1, "queues.testaddress", 2, 0, false);
-         waitForBindings(2, "queues.testaddress", 2, 0, false);
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
 
-         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(0, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(0, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id2"));
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id3"));
+         verifyReceiveAllWithGroupIDRoundRobin(0, 10, 0, 1, 2);
 
-         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         addConsumer(0, 0, "queue0", null);
-         addConsumer(1, 1, "queue0", null);
-         addConsumer(2, 2, "queue0", null);
-
-         verifyReceiveAllInRange(0, 30, 1 );
-
          System.out.println("*****************************************************************************");
       }
       finally
@@ -384,6 +379,7 @@
       }
    }
 
+
    public void testGroupingSendTo3queuesQueueRemoved() throws Exception
    {
       setupServer(0, isFileStorage(), isNetty());
@@ -498,49 +494,257 @@
          setupSessionFactory(1, isNetty());
          setupSessionFactory(2, isNetty());
 
-         createQueue(0, "queues.testaddress", "queue0", null, false);
-         createQueue(1, "queues.testaddress", "queue0", null, false);
-         createQueue(2, "queues.testaddress", "queue0", null, false);
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+         createQueue(2, "queues.testaddress", "queue0", null, true);
 
-         addConsumer(0, 0, "queue0", null);
+         addConsumer(0, 1, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+
+         stopServers(1);
+
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         startServers(1);
+
          addConsumer(1, 1, "queue0", null);
-         addConsumer(2, 2, "queue0", null);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         
 
-         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         verifyReceiveAllInRange(10, 20, 1);
+         verifyReceiveAllInRange(20, 30, 1);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         //closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws Exception
+      {
+         setupServer(0, isFileStorage(), isNetty());
+         setupServer(1, isFileStorage(), isNetty());
+         setupServer(2, isFileStorage(), isNetty());
+
+         setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+         setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+         setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+         startServers(0, 1, 2);
+
+         try
+         {
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+            setupSessionFactory(0, isNetty());
+            setupSessionFactory(1, isNetty());
+            setupSessionFactory(2, isNetty());
+
+            createQueue(0, "queues.testaddress", "queue0", null, true);
+            createQueue(1, "queues.testaddress", "queue0", null, true);
+            createQueue(2, "queues.testaddress", "queue0", null, true);
+
+            addConsumer(0, 1, "queue0", null);
+
+            waitForBindings(0, "queues.testaddress", 1, 0, true);
+            waitForBindings(1, "queues.testaddress", 1, 1, true);
+            waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+            waitForBindings(0, "queues.testaddress", 2, 1, false);
+            waitForBindings(1, "queues.testaddress", 2, 0, false);
+            waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+            sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+            verifyReceiveAllInRange(true, 0, 10, 0);
+
+            closeAllConsumers();
+
+            sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+            sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+            stopServers(1);
+
+            startServers(1);
+
+            addConsumer(1, 1, "queue0", null);
+
+            waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+
+            verifyReceiveAllInRange(10, 30, 1);
+
+
+            System.out.println("*****************************************************************************");
+         }
+         finally
+         {
+            //closeAllConsumers();
+
+            closeAllSessionFactories();
+
+            stopServers(0, 1, 2);
+         }
+      }
+
+
+   public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+         createQueue(2, "queues.testaddress", "queue0", null, true);
+
+         addConsumer(0, 1, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
-         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
 
-         waitForBindings(0, "queues.testaddress", 2, 2, false);
-         waitForBindings(1, "queues.testaddress", 2, 2, false);
-         waitForBindings(2, "queues.testaddress", 2, 2, false);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
 
          sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         verifyReceiveAllInRange(0, 10, 1);
+         verifyReceiveAllInRange(0, 10, 0);
 
-         stopClusterConnections(1);
-
          stopServers(1);
 
+
+
+         startServers(1);
+
          sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
          sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-         startServers(1);          
-         setupSessionFactory(1, isNetty());
-         createQueue(1, "queues.testaddress", "queue0", null, false);
+
          addConsumer(1, 1, "queue0", null);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
-         waitForBindings(1, "queues.testaddress", 2, 2, false);
+
+
          verifyReceiveAllInRange(10, 20, 1);
          verifyReceiveAllInRange(20, 30, 1);
 
          System.out.println("*****************************************************************************");
       }
-      catch(Exception e)
+      finally
       {
-         e.printStackTrace();
+         //closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
       }
+   }
+
+   public void testGroupingSendTo3queuesSendingNodeGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, true);
+         createQueue(1, "queues.testaddress", "queue0", null, true);
+         createQueue(2, "queues.testaddress", "queue0", null, true);
+
+         addConsumer(0, 1, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+
+         stopServers(0);
+
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+         
+
+         startServers(0);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+         verifyReceiveAllInRange(20, 30, 0);
+
+         System.out.println("*****************************************************************************");
+      }
       finally
       {
          //closeAllConsumers();
@@ -551,6 +755,71 @@
       }
    }
 
+   public void testGroupingMultipleQueuesOnAddress() throws Exception
+      {
+         setupServer(0, isFileStorage(), isNetty());
+         setupServer(1, isFileStorage(), isNetty());
+         setupServer(2, isFileStorage(), isNetty());
+
+         setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+         setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+         setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+         startServers(0, 1, 2);
+
+         try
+         {
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+            setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+            setupSessionFactory(0, isNetty());
+            setupSessionFactory(1, isNetty());
+            setupSessionFactory(2, isNetty());
+
+            createQueue(0, "queues.testaddress", "queue0", null, false);
+            createQueue(1, "queues.testaddress", "queue0", null, false);
+            createQueue(2, "queues.testaddress", "queue0", null, false);
+
+            createQueue(0, "queues.testaddress", "queue1", null, false);
+            createQueue(1, "queues.testaddress", "queue1", null, false);
+            createQueue(2, "queues.testaddress", "queue1", null, false);
+
+            addConsumer(0, 0, "queue0", null);
+            addConsumer(1, 1, "queue0", null);
+            addConsumer(2, 2, "queue0", null);
+            
+            addConsumer(3, 0, "queue0", null);
+            addConsumer(4, 1, "queue0", null);
+            addConsumer(5, 2, "queue0", null);
+
+            waitForBindings(0, "queues.testaddress", 2, 2, true);
+            waitForBindings(1, "queues.testaddress", 2, 2, true);
+            waitForBindings(2, "queues.testaddress", 2, 2, true);
+
+            waitForBindings(0, "queues.testaddress", 4, 4, false);
+            waitForBindings(1, "queues.testaddress", 4, 4, false);
+            waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+
+            sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+            verifyReceiveAll(10, 0);
+
+            System.out.println("*****************************************************************************");
+         }
+         finally
+         {
+            closeAllConsumers();
+
+            closeAllSessionFactories();
+
+            stopServers(0, 1, 2);
+         }
+      }
+
    public boolean isNetty()
    {
       return true;
@@ -558,7 +827,7 @@
 
    public boolean isFileStorage()
    {
-      return false;
+      return true;
    }
 }
 



More information about the hornetq-commits mailing list