[hornetq-commits] JBoss hornetq SVN: r7955 - in branches/hornetq_grouping: examples/core and 16 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 11 10:22:37 EDT 2009


Author: ataylor
Date: 2009-09-11 10:22:36 -0400 (Fri, 11 Sep 2009)
New Revision: 7955

Added:
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Modified:
   branches/hornetq_grouping/examples/core/hornetq-core-examples.iml
   branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
   branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml
   branches/hornetq_grouping/hornetq.ipr
   branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
   branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/hornetq_grouping/tests/hornetq-tests.iml
   branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml
   branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
merged in JBM branch

Modified: branches/hornetq_grouping/examples/core/hornetq-core-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/core/hornetq-core-examples.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/examples/core/hornetq-core-examples.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" inherit-compiler-output="true">
+    <output url="file:///production/hornetq-core-examples" />
+    <output-test url="file:///test/hornetq-core-examples" />
     <exclude-output />
     <content url="file://$MODULE_DIR$">
       <sourceFolder url="file://$MODULE_DIR$/embedded/src" isTestSource="false" />

Modified: branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,91 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
-  <component name="FacetManager">
-    <facet type="web" name="Web">
-      <configuration>
-        <descriptors>
-          <deploymentDescriptor name="web.xml" url="file://$MODULE_DIR$/servlet-transport/config/WEB-INF/web.xml" optional="false" version="2.5" />
-        </descriptors>
-        <webroots>
-          <root url="file://$MODULE_DIR$/servlet-transport/config/jms-servlet.war" relative="/" />
-        </webroots>
-        <sourceRoots>
-          <root url="file://$MODULE_DIR$/ejb-jms-transaction/src" />
-          <root url="file://$MODULE_DIR$/hajndi/src" />
-          <root url="file://$MODULE_DIR$/jca-config/src" />
-          <root url="file://$MODULE_DIR$/jms-bridge/src" />
-          <root url="file://$MODULE_DIR$/mdb/src" />
-          <root url="file://$MODULE_DIR$/servlet-transport/src" />
-          <root url="file://$MODULE_DIR$/xarecovery/src" />
-        </sourceRoots>
-        <building>
-          <setting name="EXPLODED_URL" value="file://" />
-          <setting name="EXPLODED_ENABLED" value="false" />
-          <setting name="JAR_URL" value="file://" />
-          <setting name="JAR_ENABLED" value="false" />
-          <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true" />
-        </building>
-        <packaging>
-          <containerElement type="module" name="messaging-javaee-examples">
-            <attribute name="method" value="1" />
-            <attribute name="URI" value="/WEB-INF/classes" />
-          </containerElement>
-        </packaging>
-      </configuration>
-    </facet>
-    <facet type="javaeeApplication" name="javaEEApplication">
-      <configuration>
-        <descriptors>
-          <deploymentDescriptor name="application.xml" url="file://$MODULE_DIR$/servlet-transport/config/META-INF/application.xml" optional="false" version="5" />
-        </descriptors>
-        <building>
-          <setting name="EXPLODED_URL" value="file://" />
-          <setting name="EXPLODED_ENABLED" value="false" />
-          <setting name="JAR_URL" value="file://" />
-          <setting name="JAR_ENABLED" value="false" />
-          <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true" />
-        </building>
-      </configuration>
-    </facet>
-    <facet type="javaeeApplication" name="javaEEApplication2">
-      <configuration>
-        <descriptors>
-          <deploymentDescriptor name="application.xml" url="file://$MODULE_DIR$/servlet-ssl-example/config/META-INF/application.xml" optional="false" version="5" />
-        </descriptors>
-        <building>
-          <setting name="EXPLODED_URL" value="file://" />
-          <setting name="EXPLODED_ENABLED" value="false" />
-          <setting name="JAR_URL" value="file://" />
-          <setting name="JAR_ENABLED" value="false" />
-          <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true" />
-        </building>
-      </configuration>
-    </facet>
-    <facet type="web" name="Web2">
-      <configuration>
-        <descriptors>
-          <deploymentDescriptor name="web.xml" url="file://$MODULE_DIR$/servlet-ssl-example/config/WEB-INF/web.xml" optional="false" version="2.5" />
-        </descriptors>
-        <webroots>
-          <root url="file://$MODULE_DIR$/servlet-ssl-example/config" relative="/" />
-        </webroots>
-        <building>
-          <setting name="EXPLODED_URL" value="file://" />
-          <setting name="EXPLODED_ENABLED" value="false" />
-          <setting name="JAR_URL" value="file://" />
-          <setting name="JAR_ENABLED" value="false" />
-          <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true" />
-        </building>
-        <packaging>
-          <containerElement type="module" name="messaging-javaee-examples">
-            <attribute name="method" value="1" />
-            <attribute name="URI" value="/WEB-INF/classes" />
-          </containerElement>
-        </packaging>
-      </configuration>
-    </facet>
-  </component>
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
     <output url="file://$MODULE_DIR$/output/classes" />
     <exclude-output />
     <content url="file://$MODULE_DIR$">

Modified: branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
     <output url="file://$MODULE_DIR$/output/classes" />
     <output-test url="file://$MODULE_DIR$/output" />
     <exclude-output />

Modified: branches/hornetq_grouping/hornetq.ipr
===================================================================
--- branches/hornetq_grouping/hornetq.ipr	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/hornetq.ipr	2009-09-11 14:22:36 UTC (rev 7955)
@@ -336,11 +336,21 @@
       <facet-type id="javaeeApplication">
         <modules>
           <module name="messaging" />
+          <module name="hornetq-javaee-examples">
+            <files>
+              <file url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/META-INF/application.xml" />
+            </files>
+          </module>
         </modules>
       </facet-type>
       <facet-type id="web">
         <modules>
           <module name="messaging" />
+          <module name="hornetq-javaee-examples">
+            <files>
+              <file url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/WEB-INF/web.xml" />
+            </files>
+          </module>
         </modules>
       </facet-type>
     </autodetection-disabled>
@@ -579,7 +589,7 @@
   </component>
   <component name="ProjectFileVersion" converted="true" />
   <component name="ProjectKey">
-    <option name="state" value="https://svn.jboss.org/repos/hornetq/trunk/hornetq.ipr" />
+    <option name="state" value="https://svn.jboss.org/repos/hornetq/branches/hornetq_grouping/hornetq.ipr" />
   </component>
   <component name="ProjectModuleManager">
     <modules>
@@ -592,7 +602,9 @@
       <module fileurl="file://$PROJECT_DIR$/tests/hornetq-tests.iml" filepath="$PROJECT_DIR$/tests/hornetq-tests.iml" />
     </modules>
   </component>
-  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true" project-jdk-name="1.5" project-jdk-type="JavaSDK" />
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true" project-jdk-name="1.5" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/build/classes" />
+  </component>
   <component name="ResourceManagerContainer">
     <option name="myResourceBundles">
       <value>

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -71,6 +71,12 @@
 
    public static final SimpleString HDR_CHECK_TYPE = new SimpleString("_HQ_CheckType");
 
+   public static final SimpleString HDR_PROPOSAL_TYPE = new SimpleString("_JBM_ProposalType");
+
+   public static final SimpleString HDR_PROPOSAL_VALUE = new SimpleString("_JBM_ProposalValue");
+
+   public static final SimpleString HDR_PROPOSAL_ALT_VALUE = new SimpleString("_JBM_ProposalAltValue");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -30,7 +30,9 @@
    BROADCAST_GROUP_STARTED(10),
    BROADCAST_GROUP_STOPPED(11),
    BRIDGE_STARTED(12),
-   BRIDGE_STOPPED(13);
+   BRIDGE_STOPPED(13),
+   PROPOSAL(14),
+   PROPOSAL_RESPONSE(15);
 
    private final int value;
 

Added: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.postoffice;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface BindingsFactory
+{
+   Bindings createBindings();
+}

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -19,6 +19,7 @@
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -66,5 +67,9 @@
    
    void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
    
-   Object getNotificationLock();     
+   Object getNotificationLock();
+
+   void addArbitrator(Arbitrator arbitrator);
+
+   Arbitrator getArbitrator();
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -28,9 +28,13 @@
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Bindable;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.Arbitrator;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -38,10 +42,8 @@
  * A BindingsImpl
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 11 Dec 2008 08:34:33
- *
- *
+ *         <p/>
+ *         Created 11 Dec 2008 08:34:33
  */
 public class BindingsImpl implements Bindings
 {
@@ -57,6 +59,13 @@
 
    private volatile boolean routeWhenNoConsumers;
 
+   private final PostOffice postOffice;
+
+   public BindingsImpl(PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+   }
+
    public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
    {
       this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -125,7 +134,7 @@
 
    private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
    {
-      byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+      byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
 
       ByteBuffer buff = ByteBuffer.wrap(ids);
 
@@ -167,7 +176,7 @@
       {
          return false;
       }
-      
+
       SimpleString routingName = originatingQueue.getName();
 
       List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -251,10 +260,14 @@
       }
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+    public void route(final ServerMessage message, final Transaction tx) throws Exception
    {
       boolean routed = false;
-      
+      Object o = message.getProperty("count_prop");
+      if (o != null)
+      {
+         System.out.println("routing message " + o);
+      }
       if (!exclusiveBindings.isEmpty())
       {
          for (Binding binding : exclusiveBindings)
@@ -262,7 +275,7 @@
             if (binding.getFilter() == null || binding.getFilter().match(message))
             {
                binding.getBindable().route(message, tx);
-               
+
                routed = true;
             }
          }
@@ -270,10 +283,96 @@
 
       if (!routed)
       {
+         Arbitrator groupingArbitrator = postOffice.getArbitrator();
+
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
          {
             routeFromCluster(message, tx);
          }
+         else if(groupingArbitrator != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+         {
+            SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+            Response resp = groupingArbitrator.propose(new Proposal(groupId, null));
+            if(resp == null)
+            {
+               for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+               {
+                  SimpleString routingName = entry.getKey();
+
+                  List<Binding> bindings = entry.getValue();
+                  Binding chosen = null;
+                  Binding lowestPriorityBinding = null;
+                  int lowestPriority = Integer.MAX_VALUE;
+                  for (Binding binding : bindings)
+                  {
+                     boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
+                     int distance = binding.getDistance();
+                     if((distance < lowestPriority))
+                     {
+                        lowestPriorityBinding = binding;
+                        lowestPriority = distance;
+                        if(bindingIsHighAcceptPriority)
+                        {
+                           chosen = binding;
+                        }
+                     }
+                  }
+                  if(chosen == null)
+                  {
+                     chosen = lowestPriorityBinding;
+                  }
+                  resp = groupingArbitrator.propose(new Proposal(groupId, chosen.getClusterName()));
+                  if(!resp.getChosen().equals(chosen.getClusterName()))
+                  {
+                     for (Binding binding : bindings)
+                     {
+                        if (binding.getClusterName().equals(resp.getChosen()))
+                        {
+                           chosen = binding;
+                           break;
+                        }
+                     }
+                  }
+
+                  if( chosen != null )
+                  {
+                     System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+                     chosen.willRoute(message);
+                     chosen.getBindable().preroute(message, tx);
+                     chosen.getBindable().route(message, tx);
+                  }
+               }
+            }
+            else
+            {
+               for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+               {
+                  SimpleString routingName = entry.getKey();
+
+                  List<Binding> bindings = entry.getValue();
+                  Binding chosen = null;
+                  for (Binding binding : bindings)
+                  {
+                     if(binding.getClusterName().equals(resp.getChosen()))
+                     {
+                        chosen = binding;
+                        break;
+                     }
+                  }
+                  if( chosen != null && (routeWhenNoConsumers || chosen.isHighAcceptPriority(message)))
+                  {
+                     System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+                     chosen.willRoute(message);
+                     chosen.getBindable().preroute(message, tx);
+                     chosen.getBindable().route(message, tx);
+                  }
+                  else
+                  {
+                     System.out.println("BindingsImpl.route");
+                  }
+               }
+            }
+         }
          else
          {
             Set<Bindable> chosen = new HashSet<Bindable>();
@@ -327,74 +426,74 @@
                      }
                   }
 
-                  Filter filter = binding.getFilter();
+            Filter filter = binding.getFilter();
 
-                  if (filter == null || filter.match(message))
-                  {
-                     // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
-                     // unnecessary overhead)
-                     if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
-                     {
-                        theBinding = binding;
+            if (filter == null || filter.match(message))
+            {
+               // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+               // unnecessary overhead)
+               if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
+               {
+                  theBinding = binding;
 
-                        pos = incrementPos(pos, length);
+                  pos = incrementPos(pos, length);
 
-                        break;
-                     }
-                     else
-                     {
-                        if (lastLowPriorityBinding == -1)
-                        {
-                           lastLowPriorityBinding = pos;
-                        }
-                     }
+                  break;
+               }
+               else
+               {
+                  if (lastLowPriorityBinding == -1)
+                  {
+                     lastLowPriorityBinding = pos;
                   }
+               }
+            }
 
-                  pos = incrementPos(pos, length);
+            pos = incrementPos(pos, length);
 
-                  if (pos == startPos)
+            if (pos == startPos)
+            {
+               if (lastLowPriorityBinding != -1)
+               {
+                  try
                   {
-                     if (lastLowPriorityBinding != -1)
+                     theBinding = bindings.get(pos);
+                  }
+                  catch (IndexOutOfBoundsException e)
+                  {
+                     // This can occur if binding is removed while in route
+                     if (!bindings.isEmpty())
                      {
-                        try
-                        {
-                           theBinding = bindings.get(pos);
-                        }
-                        catch (IndexOutOfBoundsException e)
-                        {
-                           // This can occur if binding is removed while in route
-                           if (!bindings.isEmpty())
-                           {
-                              pos = 0;
+                        pos = 0;
 
-                              lastLowPriorityBinding = -1;
+                        lastLowPriorityBinding = -1;
 
-                              continue;
-                           }
-                           else
-                           {
-                              break;
-                           }
-                        }
-
-                        pos = lastLowPriorityBinding;
-
-                        pos = incrementPos(pos, length);
+                        continue;
                      }
-                     break;
+                     else
+                     {
+                        break;
+                     }
                   }
-               }
 
-               if (theBinding != null)
-               {
-                  theBinding.willRoute(message);
+                  pos = lastLowPriorityBinding;
 
-                  chosen.add(theBinding.getBindable());
+                  pos = incrementPos(pos, length);
                }
-
-               routingNamePositions.put(routingName, pos);
+               break;
             }
+         }
 
+         if (theBinding != null)
+         {
+            theBinding.willRoute(message);
+
+            chosen.add(theBinding.getBindable());
+         }
+
+         routingNamePositions.put(routingName, pos);
+         }
+
             // TODO refactor to do this is one iteration
 
             for (Bindable bindable : chosen)

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -45,10 +45,12 @@
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.postoffice.BindingsFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -69,7 +71,7 @@
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
  */
-public class PostOfficeImpl implements PostOffice, NotificationListener
+public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
 {
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
 
@@ -123,6 +125,8 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
+   private Arbitrator groupingArbitrator;
+
    public PostOfficeImpl(final HornetQServer server,
                          final StorageManager storageManager,
                          final PagingManager pagingManager,
@@ -154,11 +158,11 @@
 
       if (enableWildCardRouting)
       {
-         addressManager = new WildcardAddressManager();
+         addressManager = new WildcardAddressManager(this);
       }
       else
       {
-         addressManager = new SimpleAddressManager();
+         addressManager = new SimpleAddressManager(this);
       }
 
       this.backup = backup;
@@ -553,7 +557,7 @@
 
       if (bindings == null)
       {
-         bindings = new BindingsImpl();
+         bindings = createBindings();
       }
 
       return bindings;
@@ -738,6 +742,17 @@
       return notificationLock;
    }
 
+
+   public void addArbitrator(Arbitrator arbitrator)
+   {
+      groupingArbitrator = arbitrator;
+   }
+
+   public Arbitrator getArbitrator()
+   {
+      return groupingArbitrator;
+   }
+
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
@@ -1099,4 +1114,9 @@
          }
       }
    }
+
+   public Bindings createBindings()
+   {
+      return new BindingsImpl(this);
+   }
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -21,6 +21,7 @@
 import org.hornetq.core.postoffice.AddressManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -38,6 +39,13 @@
 
    private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<SimpleString, Binding>();
 
+   private final BindingsFactory bindingsFactory;
+
+   public SimpleAddressManager(BindingsFactory bindingsFactory)
+   {
+      this.bindingsFactory = bindingsFactory;
+   }
+
    public boolean addBinding(final Binding binding)
    {
       if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
@@ -81,7 +89,7 @@
    {
       Address add = new AddressImpl(address);
       
-      Bindings bindings = new BindingsImpl();
+      Bindings bindings = bindingsFactory.createBindings();
       
       for (Binding binding: nameMap.values())
       {
@@ -150,7 +158,7 @@
 
       if (bindings == null)
       {
-         bindings = new BindingsImpl();
+         bindings = bindingsFactory.createBindings();
 
          prevBindings = mappings.putIfAbsent(address, bindings);
 

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -21,6 +21,7 @@
 import org.hornetq.core.postoffice.Address;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -50,6 +51,11 @@
 
    private final Map<SimpleString, Address> wildCardAddresses = new HashMap<SimpleString, Address>();
 
+   public WildcardAddressManager(BindingsFactory bindingsFactory)
+   {
+      super(bindingsFactory);
+   }
+
    public Bindings getBindingsForRoutingAddress(final SimpleString address)
    {
       Bindings bindings = super.getBindingsForRoutingAddress(address);

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -613,6 +613,10 @@
                                                    NotificationType.CONSUMER_CREATED +
                                                    "','" +
                                                    NotificationType.CONSUMER_CLOSED +
+                                                   "','" +
+                                                   NotificationType.PROPOSAL +
+                                                   "','" +
+                                                   NotificationType.PROPOSAL_RESPONSE +
                                                    "') AND " +
                                                    ManagementHelper.HDR_DISTANCE +
                                                    "<" +

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -46,6 +46,8 @@
 import org.hornetq.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
@@ -609,6 +611,12 @@
                case SECURITY_AUTHENTICATION_VIOLATION:
                case SECURITY_PERMISSION_VIOLATION:
                   break;
+               case PROPOSAL:
+                  doProposalReceived(message);
+                  break;
+               case PROPOSAL_RESPONSE:
+                  doProposalResponseReceived(message);
+                  break;
                default:
                {
                   throw new IllegalArgumentException("Invalid type " + ntype);
@@ -621,6 +629,37 @@
          }
       }
 
+      private synchronized void doProposalReceived(final ClientMessage message) throws Exception
+      {
+         SimpleString type = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_TYPE);
+         if (type == null)
+         {
+            throw new IllegalStateException("proposal type is null");
+         }
+         SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+         Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+         Response response = postOffice.getArbitrator().receive(new Proposal(type, val), hops + 1);
+         if(response != null)
+         {
+            postOffice.getArbitrator().send(response, 0);
+         }
+      }
+
+      private synchronized void doProposalResponseReceived(final ClientMessage message) throws Exception
+      {
+         SimpleString type = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_TYPE);
+         if (type == null)
+         {
+            throw new IllegalStateException("proposal type is null");
+         }
+         SimpleString val = (SimpleString)  message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+         SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
+         Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+         Response response = new Response(type, val, alt);
+         postOffice.getArbitrator().proposed(response);
+         postOffice.getArbitrator().send(response, hops + 1);
+      }
+      
       private synchronized void clearBindings() throws Exception
       {       
          for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(bindings.values()))

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface Arbitrator
+{
+   SimpleString getName();
+
+   Response propose(Proposal proposal) throws Exception;
+
+   void proposed(Response response) throws Exception;
+
+   void send(Response response, int distance) throws Exception;
+
+   Response receive(Proposal proposal, int distance) throws Exception;
+
+   Response rePropose(Proposal proposal) throws Exception;
+}

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.core.server.group.impl.Proposal;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface ProposalHandler
+{
+   public void handleSend(Proposal proposal, TypedProperties clientMessage);
+
+   void handleReceive(Proposal proposal);
+}

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ArbitratorConfiguration
+{
+   private final SimpleString name;
+
+   private final TYPE type;
+
+   private final SimpleString address;
+
+   public ArbitratorConfiguration(final SimpleString name, final TYPE type, SimpleString address)
+   {
+      this.type = type;
+      this.name = name;
+      this.address = address;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public TYPE getType()
+   {
+      return type;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public enum TYPE
+   {
+      LOCAL("LOCAL"),
+      REMOTE("REMOTE");
+
+      private String type;
+
+      TYPE(String type)
+      {
+         this.type = type;
+      }
+
+      public String getType()
+      {
+         return type;
+      }
+   }
+}

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.ConcurrentHashSet;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LocalArbitrator implements Arbitrator
+{
+   private static Logger log = Logger.getLogger(LocalArbitrator.class);
+
+   private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+
+   private final SimpleString name;
+
+   private final ManagementService managementService;
+
+   private SimpleString address;
+
+   private ScheduledExecutorService scheduledExecutor;
+
+   private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
+
+   public LocalArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
+   {
+      this.managementService = managementService;
+      this.name = name;
+      this.address = address;
+      this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+
+   public Response propose(Proposal proposal) throws Exception
+   {
+      if(proposal.getProposal() == null)
+      {
+         Object original = map.get(proposal.getProposalType());
+         return original == null?null:new Response(proposal.getProposalType(), original);
+      }
+      Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+      if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+      {
+         log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         return response;
+      }
+      else
+      {
+         log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+      }
+   }
+
+   public void proposed(Response response) throws Exception
+   {
+   }
+
+   public void send(Response response, int distance) throws Exception
+   {
+      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+      log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
+      TypedProperties props = new TypedProperties();
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+      Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
+      managementService.sendNotification(notification);
+   }
+
+   public Response receive(Proposal proposal, int distance) throws Exception
+   {
+      return propose(proposal);
+   }
+
+   public Response rePropose(final Proposal proposal) throws Exception
+   {
+      if(reProposals.addIfAbsent(proposal.getProposalType()))
+      {
+         Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+         map.replace(proposal.getProposalType(), response);
+         send(response, 0);
+         scheduledExecutor.schedule(new Runnable()
+         {
+            public void run()
+            {
+               reProposals.remove(proposal.getProposalType());
+            }
+         }, 2000, TimeUnit.MILLISECONDS);
+         return response;
+      }
+      else
+      {
+         return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
+      }
+   }
+}
+

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class Proposal
+{
+   private final SimpleString proposalType;
+   private final Object proposal;
+
+   public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
+   public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
+
+   public Proposal(SimpleString proposalType, Object proposal)
+   {
+      this.proposal = proposal;
+      this.proposalType = proposalType;
+   }
+
+   public SimpleString getProposalType()
+   {
+      return proposalType;
+   }
+
+   public Object getProposal()
+   {
+      return proposal;
+   }
+}
+

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class RemoteArbitrator implements Arbitrator
+{
+   private static Logger log = Logger.getLogger(RemoteArbitrator.class.getName());
+
+   private final SimpleString name;
+
+   private final ManagementService managementService;
+
+   private final SimpleString address;
+
+   private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
+
+   private final Lock lock = new ReentrantLock();
+
+   private final Condition sendCondition = lock.newCondition();
+
+   private int waitTime = 1000;
+
+   public RemoteArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address)
+   {
+      this.name = name;
+      this.address = address;
+      this.managementService = managementService;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public Response propose(final Proposal proposal) throws Exception
+   {
+      Response response = responses.get(proposal.getProposalType());
+      if( response != null)
+      {
+         return response;
+      }
+      if (proposal.getProposal() == null)
+      {
+         return null;
+      }
+      try
+      {
+         lock.lock();
+         TypedProperties props = new TypedProperties();
+         log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+         props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+         props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+         Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+         managementService.sendNotification(notification);
+         sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+      }
+      finally
+      {
+         lock.unlock();
+      }
+      return responses.get(proposal.getProposalType());
+   }
+
+   public void proposed(Response response) throws Exception
+   {
+      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+      log.info("received proposal response for " + response.getResponseType() + " with value " + value);
+      try
+      {
+         lock.lock();
+         responses.put(response.getResponseType(), response);
+         sendCondition.signal();
+      }
+      finally
+      {
+         lock.unlock();
+      }
+   }
+
+   public Response receive(Proposal proposal, int distance) throws Exception
+   {
+      TypedProperties props = new TypedProperties();
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+      Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+      managementService.sendNotification(notification);
+      return null;
+   }
+
+   public void send(Response response, int distance) throws Exception
+   {
+   }
+
+   public Response rePropose(Proposal proposal)
+   {
+      return null;
+   }
+
+}
+

Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class Response
+{
+   private final boolean accepted;
+
+   private final Object original;
+
+   private final Object alternative;
+
+   private SimpleString responseType;
+
+   public Response(SimpleString responseType, Object original)
+   {
+      this(responseType, original, null);
+   }
+
+   public Response(SimpleString responseType, Object original, Object alternative)
+   {
+      this.responseType = responseType;
+      this.accepted = alternative == null;
+      this.original = original;
+      this.alternative = alternative;
+   }
+
+   public boolean isAccepted()
+   {
+      return accepted;
+   }
+
+   public Object getOriginal()
+   {
+      return original;
+   }
+
+   public Object getAlternative()
+   {
+      return alternative;
+   }
+
+   public Object getChosen()
+   {
+      return alternative != null?alternative:original;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "accepted = " + accepted + " original = " + original + " alternative = " + alternative;
+   }
+
+   public SimpleString getResponseType()
+   {
+      return responseType;
+   }
+}

Modified: branches/hornetq_grouping/tests/hornetq-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/hornetq-tests.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/hornetq-tests.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
     <output url="file://$MODULE_DIR$/output/classes" />
     <output-test url="file://$MODULE_DIR$/output" />
     <exclude-output />

Modified: branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
     <output url="file://$MODULE_DIR$/output/classes" />
     <output-test url="file://$MODULE_DIR$/output" />
     <exclude-output />

Modified: branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml	2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module relativePaths="true" type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
     <output url="file://$MODULE_DIR$/output/classes" />
     <output-test url="file://$MODULE_DIR$/output" />
     <exclude-output />
@@ -14,6 +14,7 @@
     <orderEntry type="module" module-name="hornetq-tests" />
     <orderEntry type="library" name="messaging_jars" level="project" />
     <orderEntry type="library" name="messaging-joram-tests" level="project" />
+    <orderEntry type="module" module-name="hornetq-jms-tests" />
   </component>
 </module>
 

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -45,6 +45,10 @@
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.core.server.group.impl.LocalArbitrator;
+import org.hornetq.core.server.group.impl.RemoteArbitrator;
+import org.hornetq.core.server.group.Arbitrator;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.integration.transports.netty.TransportConstants;
@@ -421,6 +425,52 @@
       session.close();
    }
 
+   protected void sendWithProperty(int node, String address, int numMessages, boolean durable, SimpleString key, SimpleString val) throws Exception
+   {
+      sendInRange(node, address, 0, numMessages, durable, key, val);
+   }
+   
+   protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean durable, SimpleString key, SimpleString val) throws Exception
+   {
+      ClientSessionFactory sf = this.sfs[node];
+
+      if (sf == null)
+      {
+         throw new IllegalArgumentException("No sf at " + node);
+      }
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      ClientProducer producer = session.createProducer(address);
+
+      for (int i = msgStart; i < msgEnd; i++)
+      {
+         ClientMessage message = session.createClientMessage(durable);
+
+         message.putStringProperty(key, val);
+         message.putIntProperty(COUNT_PROP, i);
+         System.out.println("i = " + i);
+         producer.send(message);
+      }
+
+      session.close();
+   }
+
+
+   protected void setUpGroupArbitrator(ArbitratorConfiguration.TYPE type,  int node)
+   {
+      Arbitrator arbitrator;
+      if(type == ArbitratorConfiguration.TYPE.LOCAL)
+      {
+         arbitrator = new LocalArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
+      }
+      else
+      {
+         arbitrator = new RemoteArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+      }
+      this.servers[node].getPostOffice().addArbitrator(arbitrator);
+   }
+
    protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
    {
       sendInRange(node, address, 0, numMessages, durable, filterVal);

Added: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	                        (rev 0)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -0,0 +1,529 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClusteredGroupingTest extends ClusterTestBase
+{
+
+   public void testGroupingSimple() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+         sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAll(10, 0);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo2queues() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+         sendInRange(0, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+         sendInRange(1, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queues() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         sendInRange(0, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+         sendInRange(1, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queuesRemoteArbitrator() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 1);
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 1);
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(20, 30, 1);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queuesNoConsumerOnLocalQueue() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         //addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(20, 30, 0);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queuesNoConsumersDeliveredToLocalQueue() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
+         waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 0, false);
+         waitForBindings(1, "queues.testaddress", 2, 0, false);
+         waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         verifyReceiveAllInRange(0, 30, 1);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   public void testGroupingSendTo3queuesQueueRemoved() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         sendInRange(0, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 0);
+         removeConsumer(0);
+         deleteQueue(0, "queue0");
+         sendInRange(1, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 0);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }
+
+   /*public void testGroupingSendTo3queuesPinnedNodeGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      try
+      {
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+         waitForBindings(0, "queues.testaddress", 2, 2, false);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
+         waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(0, 10, 1);
+
+         stopClusterConnections(1);
+
+         stopServers(1);
+
+         Thread.sleep(5000);
+
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(10, 20, 1);
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+         verifyReceiveAllInRange(20, 30, 1);
+
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         //closeAllConsumers();
+
+         closeAllSessionFactories();
+
+         stopServers(0, 1, 2);
+      }
+   }*/
+
+   public boolean isNetty()
+   {
+      return true;
+   }
+
+   public boolean isFileStorage()
+   {
+      return false;
+   }
+}
+

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -39,6 +39,7 @@
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -86,7 +87,7 @@
    {
       final FakeBinding fake = new FakeBinding(new SimpleString("a"));
 
-      final BindingsImpl bind = new BindingsImpl();
+      final BindingsImpl bind = new BindingsImpl(new FakePostOffice());
       bind.addBinding(fake);
       bind.addBinding(new FakeBinding(new SimpleString("a")));
       bind.addBinding(new FakeBinding(new SimpleString("a")));

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-09-11 14:22:36 UTC (rev 7955)
@@ -23,6 +23,7 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -150,4 +151,12 @@
 
    }
 
+   public void addArbitrator(Arbitrator arbitrator)
+   {
+   }
+
+   public Arbitrator getArbitrator()
+   {
+      return null;
+   }
 }
\ No newline at end of file



More information about the hornetq-commits mailing list