[hornetq-commits] JBoss hornetq SVN: r7987 - in branches/hornetq_grouping: examples/jms/clustered-grouping and 18 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 24 05:58:08 EDT 2009


Author: ataylor
Date: 2009-09-24 05:58:07 -0400 (Thu, 24 Sep 2009)
New Revision: 7987

Added:
   branches/hornetq_grouping/examples/jms/clustered-grouping/
   branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/
   branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
Removed:
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
Modified:
   branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
   branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
   branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
example and fixes

Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/build.xml)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+      <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+      ]>
+<!--
+  ~ Copyright 2009 Red Hat, Inc.
+  ~ Red Hat licenses this file to you under the Apache License, version
+  ~ 2.0 (the "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  ~ implied.  See the License for the specific language governing
+  ~ permissions and limitations under the License.
+  -->
+<project default="run" name="HornetQ JMS Clustered Grouping Example">
+
+   <import file="../../common/build.xml"/>
+
+   <target name="run">
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+         <param name="hornetq.example.beans.file" value="server0 server1 server2"/>
+      </antcall>
+   </target>
+
+   <target name="runRemote">
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+         <param name="hornetq.example.runServer" value="false"/>
+      </antcall>
+   </target>
+
+</project>

Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/readme.html)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,174 @@
+<html>
+  <head>
+    <title>HornetQ JMS Load Balanced Queue Example</title>
+    <link rel="stylesheet" type="text/css" href="../../common/common.css">
+  </head>
+  <body>
+     <h1>HornetQ JMS Load Balanced Clustered Queue Example</h1>
+     <br>
+     <p>This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster.</p>
+     <p>We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.</p>
+     <p>We then send some messages via the producer, and we verify that <b>both</b> consumers receive the sent messages
+     in a round-robin fashion.</p>
+     <p>In other words, HornetQ <b>load balances</b> the sent messages across all consumers on the cluster</p>
+     <p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
+     JNDI, these could be instantiated directly.</p>     
+     <p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
+     and to load balance the messages between the nodes.</p>     
+     <pre>
+     <code>&lt;cluster-connection name="my-cluster"&gt;
+        &lt;address&gt;jms&lt;/address&gt;
+        &lt;retry-interval&gt;500&lt;/retry-interval&gt;
+        &lt;use-duplicate-detection&gt;true&lt;/use-duplicate-detection&gt;
+        &lt;forward-when-no-consumers&gt;true&lt;/forward-when-no-consumers&gt;
+        &lt;max-hops&gt;1&lt;/max-hops&gt;
+        &lt;discovery-group-ref discovery-group-name="my-discovery-group"/&gt;
+     &lt;/cluster-connection&gt;
+     </code>
+     </pre>    
+     <p>For more information on HornetQ load balancing, and clustering in general, please see the clustering
+     section of the user manual.</p>      
+     <h2>Example step-by-step</h2>
+     <p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
+     <br>
+     <ol>
+        <li> Get an initial context for looking up JNDI from server 0.</li>
+        <pre>
+           <code>
+   ic0 = getContext(0);
+   </code>
+        </pre>
+
+        <li>Look-up the JMS Queue object from JNDI</li>
+        <pre>
+           <code>Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");</code>
+        </pre>
+
+        <li>Look-up a JMS Connection Factory object from JNDI on server 0</li>
+        <pre>
+           <code>ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
+        </pre>
+
+        <li>Get an initial context for looking up JNDI from server 1.</li>
+        <pre>
+           <code>ic1 = getContext(1);</code>
+        </pre>
+
+        <li>Look-up a JMS Connection Factory object from JNDI on server 1</li>
+        <pre>
+           <code>ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+           </code>
+        </pre>
+
+        <li>We create a JMS Connection connection0 which is a connection to server 0</li>
+        <pre>
+          <code>
+   connection0 = cf0.createConnection();
+          </code>
+        </pre>
+        
+        <li>We create a JMS Connection connection1 which is a connection to server 1</li>
+        <pre>
+          <code>
+   connection1 = cf1.createConnection();
+          </code>
+        </pre>
+
+        <li>We create a JMS Session on server 0</li>
+        <pre>
+           <code>
+   Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+           </code>
+        </pre>
+        
+        <li>We create a JMS Session on server 1</li>
+        <pre>
+           <code>
+   Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            </code>
+        </pre>
+
+        <li>We start the connections to ensure delivery occurs on them</li>
+        <pre>
+           <code>
+   connection0.start();
+
+   connection1.start();
+           </code>
+        </pre>
+
+        <li>We create JMS MessageConsumer objects on server 0 and server 1</li>
+        <pre>
+           <code>
+   MessageConsumer consumer0 = session0.createConsumer(queue);
+
+   MessageConsumer consumer1 = session1.createConsumer(queue);
+           </code>
+        </pre>
+
+        <li>We create a JMS MessageProducer object on server 0.</li>
+        <pre>
+           <code>
+   MessageProducer producer = session0.createProducer(queue);</code>
+        </pre>
+
+        <li>We send some messages to server 0.</li>
+        <pre>
+           <code>
+	final int numMessages = 10;
+
+	for (int i = 0; i < numMessages; i++)
+	{
+	   TextMessage message = session0.createTextMessage("This is text message " + i);
+	      
+	   producer.send(message);
+	
+	   System.out.println("Sent message: " + message.getText());
+	}
+           </code>
+        </pre>
+        
+        <li>We now consume those messages on *both* server 0 and server 1.
+         We note the messages have been distributed between servers in a round robin fashion.
+         HornetQ has <b>load balanced</b> the messages between the available consumers on the different nodes.
+         HornetQ can be configured to always load balance messages to all nodes, or to only balance messages
+         to nodes which have consumers with no or matching selectors. See the user manual for more details.</li>
+         JMS Queues implement point-to-point message where each message is only ever consumed by a
+         maximum of one consumer.
+        <pre>
+           <code>
+	for (int i = 0; i < numMessages; i += 2)
+	{
+	   TextMessage message0 = (TextMessage)consumer0.receive(5000);
+	
+	   System.out.println("Got message: " + message0.getText() + " from node 0");
+	
+	   TextMessage message1 = (TextMessage)consumer1.receive(5000);
+	
+	   System.out.println("Got message: " + message1.getText() + " from node 1");
+	}
+           </code>
+        </pre> 
+
+        <li>And finally (no pun intended), <b>always</b> remember to close your JMS resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+        <pre>
+           <code>
+	finally
+	{
+	   if (connection0 != null)
+	   {
+	      connection0.close();
+	   }
+	      
+	   if (connection1 != null)
+	   {
+	      connection1.close();
+	   }
+	}
+           </code>
+        </pre>
+
+     </ol>
+  </body>
+</html>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+   <!-- JNDI server. Disable this if you don't want JNDI -->
+   <bean name="JNDIServer" class="org.jnp.server.Main">
+      <property name="namingInfo">
+         <inject bean="Naming"/>
+      </property>
+      <property name="port">1099</property>
+      <property name="bindAddress">localhost</property>
+      <property name="rmiPort">1098</property>
+      <property name="rmiBindAddress">localhost</property>
+   </bean>
+   
+   <!-- MBean server -->
+   <bean name="MBeanServer" class="javax.management.MBeanServer">
+      <constructor factoryClass="java.lang.management.ManagementFactory"
+                   factoryMethod="getPlatformMBeanServer"/>
+   </bean> 
+
+   <!-- The core configuration -->
+   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+	<!-- The security manager -->
+   <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+
+	<!-- The core server -->
+   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+      <constructor>
+         <parameter>
+            <inject bean="Configuration"/>
+         </parameter>
+         <parameter>
+            <inject bean="MBeanServer"/>
+         </parameter>
+         <parameter>
+            <inject bean="HornetQSecurityManager"/>
+         </parameter>        
+      </constructor>
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+   
+   <!-- The JMS server -->
+   <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+      <constructor>         
+         <parameter>
+            <inject bean="HornetQServer"/>
+         </parameter>
+      </constructor>
+   </bean>
+
+
+</deployment>

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,71 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+   <clustered>true</clustered>
+
+   <!-- Connectors -->
+
+   <connectors>
+      <connector name="netty-connector">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+      </connector>
+   </connectors>
+   
+   <!-- Acceptors -->
+   <acceptors>
+      <acceptor name="netty-acceptor">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+      </acceptor>
+   </acceptors>
+   
+   <!-- Clustering configuration -->
+   <broadcast-groups>
+      <broadcast-group name="my-broadcast-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <broadcast-period>100</broadcast-period>
+        <connector-ref connector-name="netty-connector"/>
+     </broadcast-group>
+   </broadcast-groups>
+   
+   <discovery-groups>
+      <discovery-group name="my-discovery-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <refresh-timeout>10000</refresh-timeout>
+      </discovery-group>
+   </discovery-groups>
+   
+   <cluster-connections>
+      <cluster-connection name="my-cluster">
+         <address>jms</address>
+         <retry-interval>500</retry-interval>
+         <use-duplicate-detection>true</use-duplicate-detection>
+         <forward-when-no-consumers>true</forward-when-no-consumers>
+         <max-hops>1</max-hops>
+         <discovery-group-ref discovery-group-name="my-discovery-group"/>
+      </cluster-connection>
+   </cluster-connections>
+
+   <grouping-handler name="my-grouping-handler">
+      <type>LOCAL</type>
+      <address>jms</address>
+   </grouping-handler>
+   
+   <!-- Other config -->
+
+   <security-settings>
+      <!--security for example queue-->
+      <security-setting match="jms.queue.exampleQueue">
+         <permission type="createDurableQueue" roles="guest"/>
+         <permission type="deleteDurableQueue" roles="guest"/>
+         <permission type="createTempQueue" roles="guest"/>
+         <permission type="deleteTempQueue" roles="guest"/>
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+   </security-settings>
+
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+   <!--the connection factory used by the example-->
+   <connection-factory name="ConnectionFactory">
+      <connector-ref connector-name="netty-connector"/>
+      <entries>
+         <entry name="ConnectionFactory"/>
+      </entries>
+   </connection-factory>
+
+   <!--the queue used by the example-->
+   <queue name="exampleQueue">
+      <entry name="/queue/exampleQueue"/>
+   </queue>
+
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+   <!-- the default user.  this is used where username is null-->
+   <defaultuser name="guest" password="guest">
+      <role name="guest"/>
+   </defaultuser>
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+   <!-- JNDI server. Disable this if you don't want JNDI -->
+   <bean name="JNDIServer" class="org.jnp.server.Main">
+      <property name="namingInfo">
+         <inject bean="Naming"/>
+      </property>
+      <property name="port">2099</property>
+      <property name="bindAddress">localhost</property>
+      <property name="rmiPort">2098</property>
+      <property name="rmiBindAddress">localhost</property>
+   </bean>
+   
+   <!-- MBean server -->
+   <bean name="MBeanServer" class="javax.management.MBeanServer">
+      <constructor factoryClass="java.lang.management.ManagementFactory"
+                   factoryMethod="getPlatformMBeanServer"/>
+   </bean> 
+
+   <!-- The core configuration -->
+   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+	<!-- The security manager -->
+   <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+
+	<!-- The core server -->
+   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+      <constructor>
+         <parameter>
+            <inject bean="Configuration"/>
+         </parameter>
+         <parameter>
+            <inject bean="MBeanServer"/>
+         </parameter>
+         <parameter>
+            <inject bean="HornetQSecurityManager"/>
+         </parameter>        
+      </constructor>
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+   
+   <!-- The JMS server -->
+   <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+      <constructor>         
+         <parameter>
+            <inject bean="HornetQServer"/>
+         </parameter>
+      </constructor>
+   </bean>
+
+</deployment>

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+   <clustered>true</clustered>
+
+   <!-- Connectors -->
+   <connectors>
+      <connector name="netty-connector">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+      </connector>
+   </connectors>
+   
+   <!-- Acceptors -->
+   <acceptors>
+      <acceptor name="netty-acceptor">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+      </acceptor>
+   </acceptors>
+   
+   <!-- Clustering configuration -->
+   <broadcast-groups>
+      <broadcast-group name="my-broadcast-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <broadcast-period>100</broadcast-period>
+         <connector-ref connector-name="netty-connector"/>
+      </broadcast-group>
+   </broadcast-groups>
+   
+   <discovery-groups>
+      <discovery-group name="my-discovery-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <refresh-timeout>10000</refresh-timeout>
+      </discovery-group>
+   </discovery-groups>
+   
+   <cluster-connections>
+      <cluster-connection name="my-cluster">
+         <address>jms</address>
+         <retry-interval>500</retry-interval>
+         <use-duplicate-detection>true</use-duplicate-detection>
+         <forward-when-no-consumers>true</forward-when-no-consumers>
+         <max-hops>1</max-hops>
+         <discovery-group-ref discovery-group-name="my-discovery-group"/>
+      </cluster-connection>
+   </cluster-connections>
+
+   <grouping-handler name="my-grouping-handler">
+      <type>REMOTE</type>
+      <address>jms</address>
+   </grouping-handler>
+
+   <!-- Other config -->
+
+   <security-settings>
+      <!--security for example queue-->
+      <security-setting match="jms.queue.exampleQueue">
+         <permission type="createDurableQueue" roles="guest"/>
+         <permission type="deleteDurableQueue" roles="guest"/>
+         <permission type="createTempQueue" roles="guest"/>
+         <permission type="deleteTempQueue" roles="guest"/>
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+   </security-settings>
+
+</configuration>

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+   <!--the connection factory used by the example-->
+   <connection-factory name="ConnectionFactory">
+      <connector-ref connector-name="netty-connector"/>
+      <entries>
+         <entry name="ConnectionFactory"/>
+      </entries>
+   </connection-factory>
+
+   <!--the queue used by the example-->
+   <queue name="exampleQueue">
+      <entry name="/queue/exampleQueue"/>
+   </queue>
+
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+   <!-- the default user.  this is used where username is null-->
+   <defaultuser name="guest" password="guest">
+      <role name="guest"/>
+   </defaultuser>
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:3099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+   <!-- JNDI server. Disable this if you don't want JNDI -->
+   <bean name="JNDIServer" class="org.jnp.server.Main">
+      <property name="namingInfo">
+         <inject bean="Naming"/>
+      </property>
+      <property name="port">3099</property>
+      <property name="bindAddress">localhost</property>
+      <property name="rmiPort">3098</property>
+      <property name="rmiBindAddress">localhost</property>
+   </bean>
+   
+   <!-- MBean server -->
+   <bean name="MBeanServer" class="javax.management.MBeanServer">
+      <constructor factoryClass="java.lang.management.ManagementFactory"
+                   factoryMethod="getPlatformMBeanServer"/>
+   </bean> 
+
+   <!-- The core configuration -->
+   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+	<!-- The security manager -->
+   <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+
+	<!-- The core server -->
+   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+      <constructor>
+         <parameter>
+            <inject bean="Configuration"/>
+         </parameter>
+         <parameter>
+            <inject bean="MBeanServer"/>
+         </parameter>
+         <parameter>
+            <inject bean="HornetQSecurityManager"/>
+         </parameter>        
+      </constructor>
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+   
+   <!-- The JMS server -->
+   <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+      <constructor>         
+         <parameter>
+            <inject bean="HornetQServer"/>
+         </parameter>
+      </constructor>
+   </bean>
+
+</deployment>

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+   <clustered>true</clustered>
+
+   <!-- Connectors -->
+   <connectors>
+      <connector name="netty-connector">
+         <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+      </connector>
+   </connectors>
+   
+   <!-- Acceptors -->
+   <acceptors>
+      <acceptor name="netty-acceptor">
+         <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+      </acceptor>
+   </acceptors>
+   
+   <!-- Clustering configuration -->
+   <broadcast-groups>
+      <broadcast-group name="my-broadcast-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <broadcast-period>100</broadcast-period>
+         <connector-ref connector-name="netty-connector"/>
+      </broadcast-group>
+   </broadcast-groups>
+   
+   <discovery-groups>
+      <discovery-group name="my-discovery-group">
+         <group-address>231.7.7.7</group-address>
+         <group-port>9876</group-port>
+         <refresh-timeout>10000</refresh-timeout>
+      </discovery-group>
+   </discovery-groups>
+   
+   <cluster-connections>
+      <cluster-connection name="my-cluster">
+         <address>jms</address>
+         <retry-interval>500</retry-interval>
+         <use-duplicate-detection>true</use-duplicate-detection>
+         <forward-when-no-consumers>true</forward-when-no-consumers>
+         <max-hops>1</max-hops>
+         <discovery-group-ref discovery-group-name="my-discovery-group"/>
+      </cluster-connection>
+   </cluster-connections>
+
+   <grouping-handler name="my-grouping-handler">
+      <type>REMOTE</type>
+      <address>jms</address>
+   </grouping-handler>
+
+   <!-- Other config -->
+
+   <security-settings>
+      <!--security for example queue-->
+      <security-setting match="jms.queue.exampleQueue">
+         <permission type="createDurableQueue" roles="guest"/>
+         <permission type="deleteDurableQueue" roles="guest"/>
+         <permission type="createTempQueue" roles="guest"/>
+         <permission type="deleteTempQueue" roles="guest"/>
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+   </security-settings>
+
+</configuration>

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+   <!--the connection factory used by the example-->
+   <connection-factory name="ConnectionFactory">
+      <connector-ref connector-name="netty-connector"/>
+      <entries>
+         <entry name="ConnectionFactory"/>
+      </entries>
+   </connection-factory>
+
+   <!--the queue used by the example-->
+   <queue name="exampleQueue">
+      <entry name="/queue/exampleQueue"/>
+   </queue>
+
+</configuration>
\ No newline at end of file

Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+   <!-- the default user.  this is used where username is null-->
+   <defaultuser name="guest" password="guest">
+      <role name="guest"/>
+   </defaultuser>
+</configuration>
\ No newline at end of file

Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/src/org/hornetq/jms/example/ClusteredQueueExample.java)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java	                        (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple example that demonstrates server side load-balancing of messages between the queue instances on different
+ * nodes of the cluster.
+ *
+ * @author <a href="tim.fox at jboss.com>Tim Fox</a>
+ */
+public class ClusteredGroupingExample extends HornetQExample
+{
+   public static void main(String[] args)
+   {
+      new ClusteredGroupingExample().run(args);
+   }
+
+   public boolean runExample() throws Exception
+   {
+      Thread.sleep(5000);
+      Connection connection0 = null;
+
+      Connection connection1 = null;
+
+      Connection connection2 = null;
+
+      InitialContext ic0 = null;
+
+      InitialContext ic1 = null;
+
+      InitialContext ic2 = null;
+
+      try
+      {
+         // Step 1. Get an initial context for looking up JNDI from server 0
+         ic0 = getContext(0);
+
+         // Step 2. Look-up the JMS Queue object from JNDI
+         Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
+
+         // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
+         ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+         // Step 4. Get an initial context for looking up JNDI from server 1
+         ic1 = getContext(1);
+
+         // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+         ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+         // Step 4. Get an initial context for looking up JNDI from server 1
+         ic2 = getContext(2);
+
+         // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+         ConnectionFactory cf2 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+         // Step 6. We create a JMS Connection connection0 which is a connection to server 0
+         connection0 = cf0.createConnection();
+
+         // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+         connection1 = cf1.createConnection();
+
+         // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+         connection2 = cf2.createConnection();
+
+         // Step 8. We create a JMS Session on server 0
+         Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // Step 9. We create a JMS Session on server 1
+         Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // Step 10. We start the connections to ensure delivery occurs on them
+         connection0.start();
+
+         connection1.start();
+
+         connection2.start();
+
+         // Step 11. We create JMS MessageConsumer objects on server 0 and server 1
+         MessageConsumer consumer = session0.createConsumer(queue);
+
+
+         // Step 12. We create a JMS MessageProducer object on server 0
+         MessageProducer producer0 = session0.createProducer(queue);
+
+         MessageProducer producer1 = session1.createProducer(queue);
+
+         MessageProducer producer2 = session2.createProducer(queue);
+
+         // Step 13. We send some messages to server 0
+
+         final int numMessages = 10;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage message = session0.createTextMessage("This is text message " + i);
+
+            message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+            producer0.send(message);
+
+            producer1.send(message);
+
+            producer2.send(message);
+
+            System.out.println("Sent messages: " + message.getText());
+         }
+
+         // Step 14. We now consume those messages on *both* server 0 and server 1.
+         // We note the messages have been distributed between servers in a round robin fashion
+         // JMS Queues implement point-to-point message where each message is only ever consumed by a
+         // maximum of one consumer
+
+         for (int i = 0; i < numMessages; i += 2)
+         {
+            TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+            System.out.println("Got message: " + message0.getText() + " from node 0");
+
+         }
+
+         return true;
+      }
+      finally
+      {
+         // Step 15. Be sure to close our resources!
+
+         if (connection0 != null)
+         {
+            connection0.close();
+         }
+
+         if (connection1 != null)
+         {
+            connection1.close();
+         }
+
+         if (ic0 != null)
+         {
+            ic0.close();
+         }
+
+         if (ic1 != null)
+         {
+            ic1.close();
+         }
+      }
+   }
+
+}
\ No newline at end of file

Modified: branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd	2009-09-24 09:58:07 UTC (rev 7987)
@@ -128,6 +128,8 @@
                         </xsd:sequence>
                     </xsd:complexType>
                 </xsd:element>
+            <xsd:element maxOccurs="1" minOccurs="0" name="grouping-handler" type="groupingHandlerType">
+            </xsd:element>
 				<xsd:element maxOccurs="1" minOccurs="0" name="paging-directory" type="xsd:string">
 				</xsd:element>
 				<xsd:element maxOccurs="1" minOccurs="0" name="bindings-directory" type="xsd:string">
@@ -383,6 +385,21 @@
 		</xsd:restriction>
 	</xsd:simpleType>
 
+   <xsd:complexType name="groupingHandlerType">
+      <xsd:sequence>
+         <xsd:element maxOccurs="1" minOccurs="1" name="type" type="groupingHandlerTypeType"/>
+         <xsd:element maxOccurs="1" minOccurs="1" name="address" type="xsd:string"/>
+      </xsd:sequence>
+       <xsd:attribute name="name" type="xsd:string" use="required"/>
+   </xsd:complexType>
+
+   <xsd:simpleType name="groupingHandlerTypeType">
+      <xsd:restriction base="xsd:string">
+         <xsd:enumeration value="LOCAL"/>
+         <xsd:enumeration value="REMOTE"/>
+      </xsd:restriction>
+   </xsd:simpleType>
+
   <xsd:element name="security-settings">
       <xsd:complexType>
          <xsd:sequence>

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -25,6 +25,7 @@
 import org.hornetq.core.config.cluster.DivertConfiguration;
 import org.hornetq.core.config.cluster.QueueConfiguration;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -125,6 +126,10 @@
 
    void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
 
+   List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+
+   void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+
    List<BridgeConfiguration> getBridgeConfigurations();
 
    void setBridgeConfigurations(final List<BridgeConfiguration> configs);

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -30,6 +30,7 @@
 import org.hornetq.core.config.cluster.DivertConfiguration;
 import org.hornetq.core.config.cluster.QueueConfiguration;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -216,6 +217,8 @@
 
    protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
 
+   protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+
    // Paging related attributes ------------------------------------------------------------
 
    protected String pagingDirectory = DEFAULT_PAGING_DIR;
@@ -465,6 +468,17 @@
       this.backupConnectorName = backupConnectorName;
    }
 
+   public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+   {
+      return groupingHandlerConfiguration;
+   }
+
+   public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+   {
+      this.groupingHandlerConfiguration = groupingHandlerConfiguration;
+   }
+
+
    public List<BridgeConfiguration> getBridgeConfigurations()
    {
       return bridgeConfigurations;

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -46,6 +46,7 @@
 import org.hornetq.core.config.cluster.DivertConfiguration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.XMLUtil;
@@ -249,6 +250,15 @@
          parseBridgeConfiguration(mfNode);
       }
 
+      NodeList gaNodes = e.getElementsByTagName("grouping-handler");
+      System.out.println("gaNodes.getLength() = " + gaNodes.getLength());
+      for (int i = 0; i < gaNodes.getLength(); i++)
+      {
+         Element gaNode = (Element) gaNodes.item(i);
+
+         parseGroupingHandlerConfiguration(gaNode);
+      }
+
       NodeList ccNodes = e.getElementsByTagName("cluster-connection");
 
       for (int i = 0; i < ccNodes.getLength(); i++)
@@ -558,6 +568,20 @@
       clusterConfigurations.add(config);
    }
 
+   private void parseGroupingHandlerConfiguration(final Element node)
+      {
+         String name = node.getAttribute("name");
+         String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+         String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+         GroupingHandlerConfiguration arbitratorConfiguration =
+               new GroupingHandlerConfiguration(new SimpleString(name),
+                                           type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+                                           new SimpleString(address));
+         System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
+         groupingHandlerConfiguration.add(arbitratorConfiguration);
+      }
+
+
    private void parseBridgeConfiguration(final Element brNode)
    {
       String name = brNode.getAttribute("name");

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -19,7 +19,7 @@
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -69,7 +69,7 @@
    
    Object getNotificationLock();
 
-   void addArbitrator(Arbitrator arbitrator);
+   void setGroupingHandler(GroupingHandler groupingHandler);
 
-   Arbitrator getArbitrator();
+   GroupingHandler getGroupingHandler();
 }

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -34,7 +34,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.group.impl.Response;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.utils.SimpleString;
@@ -284,96 +284,15 @@
 
       if (!routed)
       {
-         Arbitrator groupingArbitrator = postOffice.getArbitrator();
+         GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
 
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
          {
             routeFromCluster(message, tx);
          }
-         else if(groupingArbitrator != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+         else if(groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
          {
-            SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
-            Response resp = groupingArbitrator.propose(new Proposal(groupId, null));
-            if(resp == null)
-            {
-               for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
-               {
-                  SimpleString routingName = entry.getKey();
-
-                  List<Binding> bindings = entry.getValue();
-                  Binding chosen = null;
-                  Binding lowestPriorityBinding = null;
-                  int lowestPriority = Integer.MAX_VALUE;
-                  for (Binding binding : bindings)
-                  {
-                     boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
-                     int distance = binding.getDistance();
-                     if((distance < lowestPriority))
-                     {
-                        lowestPriorityBinding = binding;
-                        lowestPriority = distance;
-                        if(bindingIsHighAcceptPriority)
-                        {
-                           chosen = binding;
-                        }
-                     }
-                  }
-                  if(chosen == null)
-                  {
-                     chosen = lowestPriorityBinding;
-                  }
-                  resp = groupingArbitrator.propose(new Proposal(groupId, chosen.getClusterName()));
-                  if(!resp.getChosen().equals(chosen.getClusterName()))
-                  {
-                     for (Binding binding : bindings)
-                     {
-                        if (binding.getClusterName().equals(resp.getChosen()))
-                        {
-                           chosen = binding;
-                           break;
-                        }
-                     }
-                  }
-
-                  if( chosen != null )
-                  {
-                     System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
-                     chosen.willRoute(message);
-                     chosen.getBindable().preroute(message, tx);
-                     chosen.getBindable().route(message, tx);
-                  }
-               }
-            }
-            else
-            {
-               for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
-               {
-                  SimpleString routingName = entry.getKey();
-
-                  List<Binding> bindings = entry.getValue();
-                  Binding chosen = null;
-                  for (Binding binding : bindings)
-                  {
-                     if(binding.getClusterName().equals(resp.getChosen()))
-                     {
-                        chosen = binding;
-                        break;
-                     }
-                  }
-                  if( chosen != null)
-                  {
-                     System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
-                     chosen.willRoute(message);
-                     chosen.getBindable().preroute(message, tx);
-                     chosen.getBindable().route(message, tx);
-                  }
-                  else
-                  {
-                     System.out.println("BindingsImpl.route");
-                     throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
-                  }
-               }
-            }
+            routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
          }
          else
          {
@@ -511,6 +430,93 @@
       }
    }
 
+   private void routeUsingStrictOrdering(ServerMessage message, Transaction tx, GroupingHandler groupingGroupingHandler)
+         throws Exception
+   {
+      SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+      Response resp = groupingGroupingHandler.propose(new Proposal(groupId, null));
+      if(resp == null)
+      {
+         for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+         {
+            SimpleString routingName = entry.getKey();
+
+            List<Binding> bindings = entry.getValue();
+            Binding chosen = null;
+            Binding lowestPriorityBinding = null;
+            int lowestPriority = Integer.MAX_VALUE;
+            for (Binding binding : bindings)
+            {
+               boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
+               int distance = binding.getDistance();
+               if((distance < lowestPriority))
+               {
+                  lowestPriorityBinding = binding;
+                  lowestPriority = distance;
+                  if(bindingIsHighAcceptPriority)
+                  {
+                     chosen = binding;
+                  }
+               }
+            }
+            if(chosen == null)
+            {
+               chosen = lowestPriorityBinding;
+            }
+            resp = groupingGroupingHandler.propose(new Proposal(groupId, chosen.getClusterName()));
+            if(!resp.getChosen().equals(chosen.getClusterName()))
+            {
+               for (Binding binding : bindings)
+               {
+                  if (binding.getClusterName().equals(resp.getChosen()))
+                  {
+                     chosen = binding;
+                     break;
+                  }
+               }
+            }
+
+            if( chosen != null )
+            {
+               System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+               chosen.willRoute(message);
+               chosen.getBindable().preroute(message, tx);
+               chosen.getBindable().route(message, tx);
+            }
+         }
+      }
+      else
+      {
+         for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+         {
+            SimpleString routingName = entry.getKey();
+
+            List<Binding> bindings = entry.getValue();
+            Binding chosen = null;
+            for (Binding binding : bindings)
+            {
+               if(binding.getClusterName().equals(resp.getChosen()))
+               {
+                  chosen = binding;
+                  break;
+               }
+            }
+            if( chosen != null)
+            {
+               System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+               chosen.willRoute(message);
+               chosen.getBindable().preroute(message, tx);
+               chosen.getBindable().route(message, tx);
+            }
+            else
+            {
+               System.out.println("BindingsImpl.route");
+               throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+            }
+         }
+      }
+   }
+
    private final int incrementPos(int pos, int length)
    {
       pos++;

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -50,7 +50,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -125,7 +125,7 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private Arbitrator groupingArbitrator;
+   private GroupingHandler groupingGroupingHandler;
 
    public PostOfficeImpl(final HornetQServer server,
                          final StorageManager storageManager,
@@ -743,14 +743,14 @@
    }
 
 
-   public void addArbitrator(Arbitrator arbitrator)
+   public void setGroupingHandler(GroupingHandler groupingHandler)
    {
-      groupingArbitrator = arbitrator;
+      groupingGroupingHandler = groupingHandler;
    }
 
-   public Arbitrator getArbitrator()
+   public GroupingHandler getGroupingHandler()
    {
-      return groupingArbitrator;
+      return groupingGroupingHandler;
    }
 
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -638,10 +638,10 @@
          }
          SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
          Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
-         Response response = postOffice.getArbitrator().receive(new Proposal(type, val), hops + 1);
+         Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
          if(response != null)
          {
-            postOffice.getArbitrator().send(response, 0);
+            postOffice.getGroupingHandler().send(response, 0);
          }
       }
 
@@ -656,8 +656,8 @@
          SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
          Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
          Response response = new Response(type, val, alt);
-         postOffice.getArbitrator().proposed(response);
-         postOffice.getArbitrator().send(response, hops + 1);
+         postOffice.getGroupingHandler().proposed(response);
+         postOffice.getGroupingHandler().send(response, hops + 1);
       }
       
       private synchronized void clearBindings() throws Exception

Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -40,6 +40,10 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.BroadcastGroup;
 import org.hornetq.core.server.cluster.ClusterConnection;
@@ -150,6 +154,11 @@
       {
          deployClusterConnection(config);
       }
+      
+      for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
+      {
+         deployGroupingHandlerConfigurations(config);
+      }
 
       started = true;
    }
@@ -486,6 +495,21 @@
       bridge.start();
    }
 
+   private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
+   {
+      GroupingHandler groupingHandler;
+      if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+      {
+         groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), scheduledExecutor);
+      }
+      else
+      {
+         groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress());
+      }
+      log.info("deploying grouping handler: " + groupingHandler);
+      postOffice.setGroupingHandler(groupingHandler);
+   }
+
    private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
    {
       if (config.getName() == null)

Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.group;
-
-import org.hornetq.utils.SimpleString;
-import org.hornetq.core.server.group.impl.Proposal;
-import org.hornetq.core.server.group.impl.Response;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public interface Arbitrator
-{
-   SimpleString getName();
-
-   Response propose(Proposal proposal) throws Exception;
-
-   void proposed(Response response) throws Exception;
-
-   void send(Response response, int distance) throws Exception;
-
-   Response receive(Proposal proposal, int distance) throws Exception;
-
-   Response rePropose(Proposal proposal) throws Exception;
-}

Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface GroupingHandler
+{
+   SimpleString getName();
+
+   Response propose(Proposal proposal) throws Exception;
+
+   void proposed(Response response) throws Exception;
+
+   void send(Response response, int distance) throws Exception;
+
+   Response receive(Proposal proposal, int distance) throws Exception;
+
+   Response rePropose(Proposal proposal) throws Exception;
+}

Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ArbitratorConfiguration
-{
-   private final SimpleString name;
-
-   private final TYPE type;
-
-   private final SimpleString address;
-
-   public ArbitratorConfiguration(final SimpleString name, final TYPE type, SimpleString address)
-   {
-      this.type = type;
-      this.name = name;
-      this.address = address;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-   public TYPE getType()
-   {
-      return type;
-   }
-
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
-   public enum TYPE
-   {
-      LOCAL("LOCAL"),
-      REMOTE("REMOTE");
-
-      private String type;
-
-      TYPE(String type)
-      {
-         this.type = type;
-      }
-
-      public String getType()
-      {
-         return type;
-      }
-   }
-}

Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingHandlerConfiguration
+{
+   private final SimpleString name;
+
+   private final TYPE type;
+
+   private final SimpleString address;
+
+   public GroupingHandlerConfiguration(final SimpleString name, final TYPE type, SimpleString address)
+   {
+      this.type = type;
+      this.name = name;
+      this.address = address;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public TYPE getType()
+   {
+      return type;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public enum TYPE
+   {
+      LOCAL("LOCAL"),
+      REMOTE("REMOTE");
+
+      private String type;
+
+      TYPE(String type)
+      {
+         this.type = type;
+      }
+
+      public String getType()
+      {
+         return type;
+      }
+   }
+}

Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class LocalArbitrator implements Arbitrator
-{
-   private static Logger log = Logger.getLogger(LocalArbitrator.class);
-
-   private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
-
-   private final SimpleString name;
-
-   private final ManagementService managementService;
-
-   private SimpleString address;
-
-   private ScheduledExecutorService scheduledExecutor;
-
-   private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
-
-   public LocalArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
-   {
-      this.managementService = managementService;
-      this.name = name;
-      this.address = address;
-      this.scheduledExecutor = scheduledExecutor;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-
-   public Response propose(Proposal proposal) throws Exception
-   {
-      if(proposal.getProposal() == null)
-      {
-         Object original = map.get(proposal.getProposalType());
-         return original == null?null:new Response(proposal.getProposalType(), original);
-      }
-      Response response = new Response(proposal.getProposalType(), proposal.getProposal());
-      if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
-      {
-         log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
-         return response;
-      }
-      else
-      {
-         log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
-         return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
-      }
-   }
-
-   public void proposed(Response response) throws Exception
-   {
-   }
-
-   public void send(Response response, int distance) throws Exception
-   {
-      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
-      log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
-      TypedProperties props = new TypedProperties();
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
-      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
-      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
-      Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
-      managementService.sendNotification(notification);
-   }
-
-   public Response receive(Proposal proposal, int distance) throws Exception
-   {
-      return propose(proposal);
-   }
-
-   public Response rePropose(final Proposal proposal) throws Exception
-   {
-      if(reProposals.addIfAbsent(proposal.getProposalType()))
-      {
-         Response response = new Response(proposal.getProposalType(), proposal.getProposal());
-         map.replace(proposal.getProposalType(), response);
-         send(response, 0);
-         scheduledExecutor.schedule(new Runnable()
-         {
-            public void run()
-            {
-               reProposals.remove(proposal.getProposalType());
-            }
-         }, 2000, TimeUnit.MILLISECONDS);
-         return response;
-      }
-      else
-      {
-         return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
-      }
-   }
-}
-

Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.ConcurrentHashSet;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LocalGroupingHandler implements GroupingHandler
+{
+   private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
+
+   private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+
+   private final SimpleString name;
+
+   private final ManagementService managementService;
+
+   private SimpleString address;
+
+   private ScheduledExecutorService scheduledExecutor;
+
+   private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
+
+   public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
+   {
+      this.managementService = managementService;
+      this.name = name;
+      this.address = address;
+      this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+
+   public Response propose(Proposal proposal) throws Exception
+   {
+      if(proposal.getProposal() == null)
+      {
+         Object original = map.get(proposal.getProposalType());
+         return original == null?null:new Response(proposal.getProposalType(), original);
+      }
+      Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+      if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+      {
+         log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         return response;
+      }
+      else
+      {
+         log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+      }
+   }
+
+   public void proposed(Response response) throws Exception
+   {
+   }
+
+   public void send(Response response, int distance) throws Exception
+   {
+      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+      log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
+      TypedProperties props = new TypedProperties();
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+      Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
+      managementService.sendNotification(notification);
+   }
+
+   public Response receive(Proposal proposal, int distance) throws Exception
+   {
+      return propose(proposal);
+   }
+
+   public Response rePropose(final Proposal proposal) throws Exception
+   {
+      if(reProposals.addIfAbsent(proposal.getProposalType()))
+      {
+         Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+         map.replace(proposal.getProposalType(), response);
+         send(response, 0);
+         scheduledExecutor.schedule(new Runnable()
+         {
+            public void run()
+            {
+               reProposals.remove(proposal.getProposalType());
+            }
+         }, 2000, TimeUnit.MILLISECONDS);
+         return response;
+      }
+      else
+      {
+         return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
+      }
+   }
+}
+

Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- *  Red Hat licenses this file to you under the Apache License, version
- *  2.0 (the "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- *  implied.  See the License for the specific language governing
- *  permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-import java.util.logging.Logger;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class RemoteArbitrator implements Arbitrator
-{
-   private static Logger log = Logger.getLogger(RemoteArbitrator.class.getName());
-
-   private final SimpleString name;
-
-   private final ManagementService managementService;
-
-   private final SimpleString address;
-
-   private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
-
-   private final Lock lock = new ReentrantLock();
-
-   private final Condition sendCondition = lock.newCondition();
-
-   private int waitTime = 1000;
-
-   public RemoteArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address)
-   {
-      this.name = name;
-      this.address = address;
-      this.managementService = managementService;
-   }
-
-   public SimpleString getName()
-   {
-      return name;
-   }
-
-   public Response propose(final Proposal proposal) throws Exception
-   {
-      Response response = responses.get(proposal.getProposalType());
-      if( response != null)
-      {
-         return response;
-      }
-      if (proposal.getProposal() == null)
-      {
-         return null;
-      }
-      try
-      {
-         lock.lock();
-         TypedProperties props = new TypedProperties();
-         log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
-         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
-         props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
-         props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-         props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
-         Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
-         managementService.sendNotification(notification);
-         sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
-      }
-      finally
-      {
-         lock.unlock();
-      }
-      return responses.get(proposal.getProposalType());
-   }
-
-   public void proposed(Response response) throws Exception
-   {
-      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
-      log.info("received proposal response for " + response.getResponseType() + " with value " + value);
-      try
-      {
-         lock.lock();
-         responses.put(response.getResponseType(), response);
-         sendCondition.signal();
-      }
-      finally
-      {
-         lock.unlock();
-      }
-   }
-
-   public Response receive(Proposal proposal, int distance) throws Exception
-   {
-      TypedProperties props = new TypedProperties();
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
-      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
-      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
-      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
-      Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
-      managementService.sendNotification(notification);
-      return null;
-   }
-
-   public void send(Response response, int distance) throws Exception
-   {
-   }
-
-   public Response rePropose(Proposal proposal)
-   {
-      return null;
-   }
-
-}
-

Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	                        (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class RemoteGroupingHandler implements GroupingHandler
+{
+   private static Logger log = Logger.getLogger(RemoteGroupingHandler.class.getName());
+
+   private final SimpleString name;
+
+   private final ManagementService managementService;
+
+   private final SimpleString address;
+
+   private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
+
+   private final Lock lock = new ReentrantLock();
+
+   private final Condition sendCondition = lock.newCondition();
+
+   private int waitTime = 1000;
+
+   public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+   {
+      this.name = name;
+      this.address = address;
+      this.managementService = managementService;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public Response propose(final Proposal proposal) throws Exception
+   {
+      Response response = responses.get(proposal.getProposalType());
+      if( response != null)
+      {
+         return response;
+      }
+      if (proposal.getProposal() == null)
+      {
+         return null;
+      }
+      try
+      {
+         lock.lock();
+         TypedProperties props = new TypedProperties();
+         log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+         props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+         props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+         props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+         Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+         managementService.sendNotification(notification);
+         sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+         response = responses.get(proposal.getProposalType());
+      }
+      finally
+      {
+         lock.unlock();
+      }
+      if(response == null)
+      {
+         throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+      }
+      return response;
+   }
+
+   public void proposed(Response response) throws Exception
+   {
+      Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+      log.info("received proposal response for " + response.getResponseType() + " with value " + value);
+      try
+      {
+         lock.lock();
+         responses.put(response.getResponseType(), response);
+         sendCondition.signal();
+      }
+      finally
+      {
+         lock.unlock();
+      }
+   }
+
+   public Response receive(Proposal proposal, int distance) throws Exception
+   {
+      TypedProperties props = new TypedProperties();
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+      props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+      props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+      Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+      managementService.sendNotification(notification);
+      return null;
+   }
+
+   public void send(Response response, int distance) throws Exception
+   {
+   }
+
+   public Response rePropose(Proposal proposal)
+   {
+      return null;
+   }
+
+}
+

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -45,10 +45,10 @@
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
-import org.hornetq.core.server.group.impl.LocalArbitrator;
-import org.hornetq.core.server.group.impl.RemoteArbitrator;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.integration.transports.netty.TransportConstants;
@@ -457,18 +457,18 @@
    }
 
 
-   protected void setUpGroupArbitrator(ArbitratorConfiguration.TYPE type,  int node)
+   protected void setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE type,  int node)
    {
-      Arbitrator arbitrator;
-      if(type == ArbitratorConfiguration.TYPE.LOCAL)
+      GroupingHandler groupingHandler;
+      if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
       {
-         arbitrator = new LocalArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
+         groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
       }
       else
       {
-         arbitrator = new RemoteArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+         groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
       }
-      this.servers[node].getPostOffice().addArbitrator(arbitrator);
+      this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
    }
 
    protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -13,7 +13,7 @@
 package org.hornetq.tests.integration.cluster.distribution;
 
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.utils.SimpleString;
 
@@ -39,9 +39,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -95,9 +95,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -154,9 +154,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -216,9 +216,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -278,9 +278,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -340,9 +340,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -400,9 +400,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -490,9 +490,9 @@
 
       try
       {
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
-         setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+         setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -526,13 +526,23 @@
 
          sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
+
+         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         startServers(1);          
+         setupSessionFactory(1, isNetty());
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         addConsumer(1, 1, "queue0", null);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 2, 2, false);
          verifyReceiveAllInRange(10, 20, 1);
-         sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
          verifyReceiveAllInRange(20, 30, 1);
 
          System.out.println("*****************************************************************************");
       }
+      catch(Exception e)
+      {
+         e.printStackTrace();
+      }
       finally
       {
          //closeAllConsumers();

Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-09-24 09:58:07 UTC (rev 7987)
@@ -23,7 +23,7 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -151,11 +151,11 @@
 
    }
 
-   public void addArbitrator(Arbitrator arbitrator)
+   public void setGroupingHandler(GroupingHandler groupingHandler)
    {
    }
 
-   public Arbitrator getArbitrator()
+   public GroupingHandler getGroupingHandler()
    {
       return null;
    }



More information about the hornetq-commits mailing list