[hornetq-commits] JBoss hornetq SVN: r7987 - in branches/hornetq_grouping: examples/jms/clustered-grouping and 18 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Sep 24 05:58:08 EDT 2009
Author: ataylor
Date: 2009-09-24 05:58:07 -0400 (Thu, 24 Sep 2009)
New Revision: 7987
Added:
branches/hornetq_grouping/examples/jms/clustered-grouping/
branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/src/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
Removed:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
Modified:
branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
example and fixes
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/build.xml)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ JMS Clustered Grouping Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.beans.file" value="server0 server1 server2"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/readme.html)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,174 @@
+<html>
+ <head>
+ <title>HornetQ JMS Load Balanced Queue Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css">
+ </head>
+ <body>
+ <h1>HornetQ JMS Load Balanced Clustered Queue Example</h1>
+ <br>
+ <p>This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster.</p>
+ <p>We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.</p>
+ <p>We then send some messages via the producer, and we verify that <b>both</b> consumers receive the sent messages
+ in a round-robin fashion.</p>
+ <p>In other words, HornetQ <b>load balances</b> the sent messages across all consumers on the cluster</p>
+ <p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
+ JNDI, these could be instantiated directly.</p>
+ <p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
+ and to load balance the messages between the nodes.</p>
+ <pre>
+ <code><cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </code>
+ </pre>
+ <p>For more information on HornetQ load balancing, and clustering in general, please see the clustering
+ section of the user manual.</p>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
+ <br>
+ <ol>
+ <li> Get an initial context for looking up JNDI from server 0.</li>
+ <pre>
+ <code>
+ ic0 = getContext(0);
+ </code>
+ </pre>
+
+ <li>Look-up the JMS Queue object from JNDI</li>
+ <pre>
+ <code>Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server 0</li>
+ <pre>
+ <code>ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>Get an initial context for looking up JNDI from server 1.</li>
+ <pre>
+ <code>ic1 = getContext(1);</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server 1</li>
+ <pre>
+ <code>ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ </code>
+ </pre>
+
+ <li>We create a JMS Connection connection0 which is a connection to server 0</li>
+ <pre>
+ <code>
+ connection0 = cf0.createConnection();
+ </code>
+ </pre>
+
+ <li>We create a JMS Connection connection1 which is a connection to server 1</li>
+ <pre>
+ <code>
+ connection1 = cf1.createConnection();
+ </code>
+ </pre>
+
+ <li>We create a JMS Session on server 0</li>
+ <pre>
+ <code>
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ </code>
+ </pre>
+
+ <li>We create a JMS Session on server 1</li>
+ <pre>
+ <code>
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ </code>
+ </pre>
+
+ <li>We start the connections to ensure delivery occurs on them</li>
+ <pre>
+ <code>
+ connection0.start();
+
+ connection1.start();
+ </code>
+ </pre>
+
+ <li>We create JMS MessageConsumer objects on server 0 and server 1</li>
+ <pre>
+ <code>
+ MessageConsumer consumer0 = session0.createConsumer(queue);
+
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+ </code>
+ </pre>
+
+ <li>We create a JMS MessageProducer object on server 0.</li>
+ <pre>
+ <code>
+ MessageProducer producer = session0.createProducer(queue);</code>
+ </pre>
+
+ <li>We send some messages to server 0.</li>
+ <pre>
+ <code>
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message " + i);
+
+ producer.send(message);
+
+ System.out.println("Sent message: " + message.getText());
+ }
+ </code>
+ </pre>
+
+ <li>We now consume those messages on *both* server 0 and server 1.
+ We note the messages have been distributed between servers in a round robin fashion.
+ HornetQ has <b>load balanced</b> the messages between the available consumers on the different nodes.
+ HornetQ can be configured to always load balance messages to all nodes, or to only balance messages
+ to nodes which have consumers with no or matching selectors. See the user manual for more details.</li>
+ JMS Queues implement point-to-point message where each message is only ever consumed by a
+ maximum of one consumer.
+ <pre>
+ <code>
+ for (int i = 0; i < numMessages; i += 2)
+ {
+ TextMessage message0 = (TextMessage)consumer0.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + " from node 0");
+
+ TextMessage message1 = (TextMessage)consumer1.receive(5000);
+
+ System.out.println("Got message: " + message1.getText() + " from node 1");
+ }
+ </code>
+ </pre>
+
+ <li>And finally (no pun intended), <b>always</b> remember to close your JMS resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre>
+ <code>
+ finally
+ {
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+ }
+ </code>
+ </pre>
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,71 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>LOCAL</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:3099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">3099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">3098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/src/org/hornetq/jms/example/ClusteredQueueExample.java)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple example that demonstrates server side load-balancing of messages between the queue instances on different
+ * nodes of the cluster.
+ *
+ * @author <a href="tim.fox at jboss.com>Tim Fox</a>
+ */
+public class ClusteredGroupingExample extends HornetQExample
+{
+ public static void main(String[] args)
+ {
+ new ClusteredGroupingExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Thread.sleep(5000);
+ Connection connection0 = null;
+
+ Connection connection1 = null;
+
+ Connection connection2 = null;
+
+ InitialContext ic0 = null;
+
+ InitialContext ic1 = null;
+
+ InitialContext ic2 = null;
+
+ try
+ {
+ // Step 1. Get an initial context for looking up JNDI from server 0
+ ic0 = getContext(0);
+
+ // Step 2. Look-up the JMS Queue object from JNDI
+ Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
+
+ // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic1 = getContext(1);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic2 = getContext(2);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf2 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ // Step 6. We create a JMS Connection connection0 which is a connection to server 0
+ connection0 = cf0.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+ connection1 = cf1.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+ connection2 = cf2.createConnection();
+
+ // Step 8. We create a JMS Session on server 0
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 9. We create a JMS Session on server 1
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 10. We start the connections to ensure delivery occurs on them
+ connection0.start();
+
+ connection1.start();
+
+ connection2.start();
+
+ // Step 11. We create JMS MessageConsumer objects on server 0 and server 1
+ MessageConsumer consumer = session0.createConsumer(queue);
+
+
+ // Step 12. We create a JMS MessageProducer object on server 0
+ MessageProducer producer0 = session0.createProducer(queue);
+
+ MessageProducer producer1 = session1.createProducer(queue);
+
+ MessageProducer producer2 = session2.createProducer(queue);
+
+ // Step 13. We send some messages to server 0
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message " + i);
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer0.send(message);
+
+ producer1.send(message);
+
+ producer2.send(message);
+
+ System.out.println("Sent messages: " + message.getText());
+ }
+
+ // Step 14. We now consume those messages on *both* server 0 and server 1.
+ // We note the messages have been distributed between servers in a round robin fashion
+ // JMS Queues implement point-to-point message where each message is only ever consumed by a
+ // maximum of one consumer
+
+ for (int i = 0; i < numMessages; i += 2)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + " from node 0");
+
+ }
+
+ return true;
+ }
+ finally
+ {
+ // Step 15. Be sure to close our resources!
+
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd 2009-09-24 09:58:07 UTC (rev 7987)
@@ -128,6 +128,8 @@
</xsd:sequence>
</xsd:complexType>
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="grouping-handler" type="groupingHandlerType">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="paging-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="bindings-directory" type="xsd:string">
@@ -383,6 +385,21 @@
</xsd:restriction>
</xsd:simpleType>
+ <xsd:complexType name="groupingHandlerType">
+ <xsd:sequence>
+ <xsd:element maxOccurs="1" minOccurs="1" name="type" type="groupingHandlerTypeType"/>
+ <xsd:element maxOccurs="1" minOccurs="1" name="address" type="xsd:string"/>
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:string" use="required"/>
+ </xsd:complexType>
+
+ <xsd:simpleType name="groupingHandlerTypeType">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="LOCAL"/>
+ <xsd:enumeration value="REMOTE"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+
<xsd:element name="security-settings">
<xsd:complexType>
<xsd:sequence>
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -25,6 +25,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -125,6 +126,10 @@
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
+ List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+
+ void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+
List<BridgeConfiguration> getBridgeConfigurations();
void setBridgeConfigurations(final List<BridgeConfiguration> configs);
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -30,6 +30,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -216,6 +217,8 @@
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
+ protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+
// Paging related attributes ------------------------------------------------------------
protected String pagingDirectory = DEFAULT_PAGING_DIR;
@@ -465,6 +468,17 @@
this.backupConnectorName = backupConnectorName;
}
+ public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+ {
+ return groupingHandlerConfiguration;
+ }
+
+ public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+ {
+ this.groupingHandlerConfiguration = groupingHandlerConfiguration;
+ }
+
+
public List<BridgeConfiguration> getBridgeConfigurations()
{
return bridgeConfigurations;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -46,6 +46,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.XMLUtil;
@@ -249,6 +250,15 @@
parseBridgeConfiguration(mfNode);
}
+ NodeList gaNodes = e.getElementsByTagName("grouping-handler");
+ System.out.println("gaNodes.getLength() = " + gaNodes.getLength());
+ for (int i = 0; i < gaNodes.getLength(); i++)
+ {
+ Element gaNode = (Element) gaNodes.item(i);
+
+ parseGroupingHandlerConfiguration(gaNode);
+ }
+
NodeList ccNodes = e.getElementsByTagName("cluster-connection");
for (int i = 0; i < ccNodes.getLength(); i++)
@@ -558,6 +568,20 @@
clusterConfigurations.add(config);
}
+ private void parseGroupingHandlerConfiguration(final Element node)
+ {
+ String name = node.getAttribute("name");
+ String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ GroupingHandlerConfiguration arbitratorConfiguration =
+ new GroupingHandlerConfiguration(new SimpleString(name),
+ type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address));
+ System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
+ groupingHandlerConfiguration.add(arbitratorConfiguration);
+ }
+
+
private void parseBridgeConfiguration(final Element brNode)
{
String name = brNode.getAttribute("name");
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -19,7 +19,7 @@
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -69,7 +69,7 @@
Object getNotificationLock();
- void addArbitrator(Arbitrator arbitrator);
+ void setGroupingHandler(GroupingHandler groupingHandler);
- Arbitrator getArbitrator();
+ GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -34,7 +34,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
@@ -284,96 +284,15 @@
if (!routed)
{
- Arbitrator groupingArbitrator = postOffice.getArbitrator();
+ GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
routeFromCluster(message, tx);
}
- else if(groupingArbitrator != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+ else if(groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
{
- SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
- Response resp = groupingArbitrator.propose(new Proposal(groupId, null));
- if(resp == null)
- {
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
- {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
- Binding chosen = null;
- Binding lowestPriorityBinding = null;
- int lowestPriority = Integer.MAX_VALUE;
- for (Binding binding : bindings)
- {
- boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
- int distance = binding.getDistance();
- if((distance < lowestPriority))
- {
- lowestPriorityBinding = binding;
- lowestPriority = distance;
- if(bindingIsHighAcceptPriority)
- {
- chosen = binding;
- }
- }
- }
- if(chosen == null)
- {
- chosen = lowestPriorityBinding;
- }
- resp = groupingArbitrator.propose(new Proposal(groupId, chosen.getClusterName()));
- if(!resp.getChosen().equals(chosen.getClusterName()))
- {
- for (Binding binding : bindings)
- {
- if (binding.getClusterName().equals(resp.getChosen()))
- {
- chosen = binding;
- break;
- }
- }
- }
-
- if( chosen != null )
- {
- System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
- chosen.willRoute(message);
- chosen.getBindable().preroute(message, tx);
- chosen.getBindable().route(message, tx);
- }
- }
- }
- else
- {
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
- {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
- Binding chosen = null;
- for (Binding binding : bindings)
- {
- if(binding.getClusterName().equals(resp.getChosen()))
- {
- chosen = binding;
- break;
- }
- }
- if( chosen != null)
- {
- System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
- chosen.willRoute(message);
- chosen.getBindable().preroute(message, tx);
- chosen.getBindable().route(message, tx);
- }
- else
- {
- System.out.println("BindingsImpl.route");
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
- }
- }
- }
+ routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
}
else
{
@@ -511,6 +430,93 @@
}
}
+ private void routeUsingStrictOrdering(ServerMessage message, Transaction tx, GroupingHandler groupingGroupingHandler)
+ throws Exception
+ {
+ SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+ Response resp = groupingGroupingHandler.propose(new Proposal(groupId, null));
+ if(resp == null)
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ Binding lowestPriorityBinding = null;
+ int lowestPriority = Integer.MAX_VALUE;
+ for (Binding binding : bindings)
+ {
+ boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
+ int distance = binding.getDistance();
+ if((distance < lowestPriority))
+ {
+ lowestPriorityBinding = binding;
+ lowestPriority = distance;
+ if(bindingIsHighAcceptPriority)
+ {
+ chosen = binding;
+ }
+ }
+ }
+ if(chosen == null)
+ {
+ chosen = lowestPriorityBinding;
+ }
+ resp = groupingGroupingHandler.propose(new Proposal(groupId, chosen.getClusterName()));
+ if(!resp.getChosen().equals(chosen.getClusterName()))
+ {
+ for (Binding binding : bindings)
+ {
+ if (binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ }
+
+ if( chosen != null )
+ {
+ System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ }
+ }
+ else
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ for (Binding binding : bindings)
+ {
+ if(binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ if( chosen != null)
+ {
+ System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ else
+ {
+ System.out.println("BindingsImpl.route");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ }
+ }
+ }
+ }
+
private final int incrementPos(int pos, int length)
{
pos++;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -50,7 +50,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -125,7 +125,7 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private Arbitrator groupingArbitrator;
+ private GroupingHandler groupingGroupingHandler;
public PostOfficeImpl(final HornetQServer server,
final StorageManager storageManager,
@@ -743,14 +743,14 @@
}
- public void addArbitrator(Arbitrator arbitrator)
+ public void setGroupingHandler(GroupingHandler groupingHandler)
{
- groupingArbitrator = arbitrator;
+ groupingGroupingHandler = groupingHandler;
}
- public Arbitrator getArbitrator()
+ public GroupingHandler getGroupingHandler()
{
- return groupingArbitrator;
+ return groupingGroupingHandler;
}
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -638,10 +638,10 @@
}
SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
- Response response = postOffice.getArbitrator().receive(new Proposal(type, val), hops + 1);
+ Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
if(response != null)
{
- postOffice.getArbitrator().send(response, 0);
+ postOffice.getGroupingHandler().send(response, 0);
}
}
@@ -656,8 +656,8 @@
SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
Response response = new Response(type, val, alt);
- postOffice.getArbitrator().proposed(response);
- postOffice.getArbitrator().send(response, hops + 1);
+ postOffice.getGroupingHandler().proposed(response);
+ postOffice.getGroupingHandler().send(response, hops + 1);
}
private synchronized void clearBindings() throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -40,6 +40,10 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -150,6 +154,11 @@
{
deployClusterConnection(config);
}
+
+ for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
+ {
+ deployGroupingHandlerConfigurations(config);
+ }
started = true;
}
@@ -486,6 +495,21 @@
bridge.start();
}
+ private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
+ {
+ GroupingHandler groupingHandler;
+ if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+ {
+ groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), scheduledExecutor);
+ }
+ else
+ {
+ groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress());
+ }
+ log.info("deploying grouping handler: " + groupingHandler);
+ postOffice.setGroupingHandler(groupingHandler);
+ }
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group;
-
-import org.hornetq.utils.SimpleString;
-import org.hornetq.core.server.group.impl.Proposal;
-import org.hornetq.core.server.group.impl.Response;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public interface Arbitrator
-{
- SimpleString getName();
-
- Response propose(Proposal proposal) throws Exception;
-
- void proposed(Response response) throws Exception;
-
- void send(Response response, int distance) throws Exception;
-
- Response receive(Proposal proposal, int distance) throws Exception;
-
- Response rePropose(Proposal proposal) throws Exception;
-}
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface GroupingHandler
+{
+ SimpleString getName();
+
+ Response propose(Proposal proposal) throws Exception;
+
+ void proposed(Response response) throws Exception;
+
+ void send(Response response, int distance) throws Exception;
+
+ Response receive(Proposal proposal, int distance) throws Exception;
+
+ Response rePropose(Proposal proposal) throws Exception;
+}
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ArbitratorConfiguration
-{
- private final SimpleString name;
-
- private final TYPE type;
-
- private final SimpleString address;
-
- public ArbitratorConfiguration(final SimpleString name, final TYPE type, SimpleString address)
- {
- this.type = type;
- this.name = name;
- this.address = address;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public TYPE getType()
- {
- return type;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public enum TYPE
- {
- LOCAL("LOCAL"),
- REMOTE("REMOTE");
-
- private String type;
-
- TYPE(String type)
- {
- this.type = type;
- }
-
- public String getType()
- {
- return type;
- }
- }
-}
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingHandlerConfiguration
+{
+ private final SimpleString name;
+
+ private final TYPE type;
+
+ private final SimpleString address;
+
+ public GroupingHandlerConfiguration(final SimpleString name, final TYPE type, SimpleString address)
+ {
+ this.type = type;
+ this.name = name;
+ this.address = address;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public TYPE getType()
+ {
+ return type;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public enum TYPE
+ {
+ LOCAL("LOCAL"),
+ REMOTE("REMOTE");
+
+ private String type;
+
+ TYPE(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+}
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class LocalArbitrator implements Arbitrator
-{
- private static Logger log = Logger.getLogger(LocalArbitrator.class);
-
- private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private SimpleString address;
-
- private ScheduledExecutorService scheduledExecutor;
-
- private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
-
- public LocalArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
- {
- this.managementService = managementService;
- this.name = name;
- this.address = address;
- this.scheduledExecutor = scheduledExecutor;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
-
- public Response propose(Proposal proposal) throws Exception
- {
- if(proposal.getProposal() == null)
- {
- Object original = map.get(proposal.getProposalType());
- return original == null?null:new Response(proposal.getProposalType(), original);
- }
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
- {
- log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- return response;
- }
- else
- {
- log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
- }
- }
-
- public void proposed(Response response) throws Exception
- {
- }
-
- public void send(Response response, int distance) throws Exception
- {
- Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
- managementService.sendNotification(notification);
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- return propose(proposal);
- }
-
- public Response rePropose(final Proposal proposal) throws Exception
- {
- if(reProposals.addIfAbsent(proposal.getProposalType()))
- {
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- map.replace(proposal.getProposalType(), response);
- send(response, 0);
- scheduledExecutor.schedule(new Runnable()
- {
- public void run()
- {
- reProposals.remove(proposal.getProposalType());
- }
- }, 2000, TimeUnit.MILLISECONDS);
- return response;
- }
- else
- {
- return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
- }
- }
-}
-
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.ConcurrentHashSet;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LocalGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
+
+ private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private SimpleString address;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
+
+ public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
+ {
+ this.managementService = managementService;
+ this.name = name;
+ this.address = address;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ if(proposal.getProposal() == null)
+ {
+ Object original = map.get(proposal.getProposalType());
+ return original == null?null:new Response(proposal.getProposalType(), original);
+ }
+ Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+ if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+ {
+ log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ return response;
+ }
+ else
+ {
+ log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+ }
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+ log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
+ managementService.sendNotification(notification);
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ return propose(proposal);
+ }
+
+ public Response rePropose(final Proposal proposal) throws Exception
+ {
+ if(reProposals.addIfAbsent(proposal.getProposalType()))
+ {
+ Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+ map.replace(proposal.getProposalType(), response);
+ send(response, 0);
+ scheduledExecutor.schedule(new Runnable()
+ {
+ public void run()
+ {
+ reProposals.remove(proposal.getProposalType());
+ }
+ }, 2000, TimeUnit.MILLISECONDS);
+ return response;
+ }
+ else
+ {
+ return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
+ }
+ }
+}
+
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-import java.util.logging.Logger;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class RemoteArbitrator implements Arbitrator
-{
- private static Logger log = Logger.getLogger(RemoteArbitrator.class.getName());
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private final SimpleString address;
-
- private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition sendCondition = lock.newCondition();
-
- private int waitTime = 1000;
-
- public RemoteArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address)
- {
- this.name = name;
- this.address = address;
- this.managementService = managementService;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public Response propose(final Proposal proposal) throws Exception
- {
- Response response = responses.get(proposal.getProposalType());
- if( response != null)
- {
- return response;
- }
- if (proposal.getProposal() == null)
- {
- return null;
- }
- try
- {
- lock.lock();
- TypedProperties props = new TypedProperties();
- log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
- Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
- managementService.sendNotification(notification);
- sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
- }
- finally
- {
- lock.unlock();
- }
- return responses.get(proposal.getProposalType());
- }
-
- public void proposed(Response response) throws Exception
- {
- Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("received proposal response for " + response.getResponseType() + " with value " + value);
- try
- {
- lock.lock();
- responses.put(response.getResponseType(), response);
- sendCondition.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
- managementService.sendNotification(notification);
- return null;
- }
-
- public void send(Response response, int distance) throws Exception
- {
- }
-
- public Response rePropose(Proposal proposal)
- {
- return null;
- }
-
-}
-
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class RemoteGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(RemoteGroupingHandler.class.getName());
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private final SimpleString address;
+
+ private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private int waitTime = 1000;
+
+ public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+ {
+ this.name = name;
+ this.address = address;
+ this.managementService = managementService;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public Response propose(final Proposal proposal) throws Exception
+ {
+ Response response = responses.get(proposal.getProposalType());
+ if( response != null)
+ {
+ return response;
+ }
+ if (proposal.getProposal() == null)
+ {
+ return null;
+ }
+ try
+ {
+ lock.lock();
+ TypedProperties props = new TypedProperties();
+ log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+ managementService.sendNotification(notification);
+ sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+ response = responses.get(proposal.getProposalType());
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ if(response == null)
+ {
+ throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+ }
+ return response;
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+ log.info("received proposal response for " + response.getResponseType() + " with value " + value);
+ try
+ {
+ lock.lock();
+ responses.put(response.getResponseType(), response);
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+ managementService.sendNotification(notification);
+ return null;
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ }
+
+ public Response rePropose(Proposal proposal)
+ {
+ return null;
+ }
+
+}
+
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -45,10 +45,10 @@
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
-import org.hornetq.core.server.group.impl.LocalArbitrator;
-import org.hornetq.core.server.group.impl.RemoteArbitrator;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -457,18 +457,18 @@
}
- protected void setUpGroupArbitrator(ArbitratorConfiguration.TYPE type, int node)
+ protected void setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE type, int node)
{
- Arbitrator arbitrator;
- if(type == ArbitratorConfiguration.TYPE.LOCAL)
+ GroupingHandler groupingHandler;
+ if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
{
- arbitrator = new LocalArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
+ groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
}
else
{
- arbitrator = new RemoteArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+ groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
}
- this.servers[node].getPostOffice().addArbitrator(arbitrator);
+ this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
}
protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -13,7 +13,7 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
@@ -39,9 +39,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -95,9 +95,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -154,9 +154,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -216,9 +216,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -278,9 +278,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -340,9 +340,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -400,9 +400,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -490,9 +490,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -526,13 +526,23 @@
sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ startServers(1);
+ setupSessionFactory(1, isNetty());
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ addConsumer(1, 1, "queue0", null);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
verifyReceiveAllInRange(10, 20, 1);
- sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
verifyReceiveAllInRange(20, 30, 1);
System.out.println("*****************************************************************************");
}
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
finally
{
//closeAllConsumers();
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -23,7 +23,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -151,11 +151,11 @@
}
- public void addArbitrator(Arbitrator arbitrator)
+ public void setGroupingHandler(GroupingHandler groupingHandler)
{
}
- public Arbitrator getArbitrator()
+ public GroupingHandler getGroupingHandler()
{
return null;
}
More information about the hornetq-commits
mailing list