Author: ataylor
Date: 2009-10-28 14:32:19 -0400 (Wed, 28 Oct 2009)
New Revision: 8158
Added:
trunk/examples/jms/clustered-grouping/
trunk/examples/jms/clustered-grouping/build.bat
trunk/examples/jms/clustered-grouping/build.sh
trunk/examples/jms/clustered-grouping/build.xml
trunk/examples/jms/clustered-grouping/readme.html
trunk/examples/jms/clustered-grouping/server0/
trunk/examples/jms/clustered-grouping/server0/client-jndi.properties
trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml
trunk/examples/jms/clustered-grouping/server1/
trunk/examples/jms/clustered-grouping/server1/client-jndi.properties
trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml
trunk/examples/jms/clustered-grouping/server2/
trunk/examples/jms/clustered-grouping/server2/client-jndi.properties
trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml
trunk/examples/jms/clustered-grouping/src/
trunk/examples/jms/clustered-grouping/src/org/
trunk/examples/jms/clustered-grouping/src/org/hornetq/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
trunk/src/main/org/hornetq/core/persistence/GroupingInfo.java
trunk/src/main/org/hornetq/core/postoffice/BindingsFactory.java
trunk/src/main/org/hornetq/core/server/group/
trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/
trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/Response.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Removed:
trunk/examples/jms/clustered-grouping/build.xml
trunk/examples/jms/clustered-grouping/readme.html
trunk/examples/jms/clustered-grouping/server0/
trunk/examples/jms/clustered-grouping/server0/client-jndi.properties
trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml
trunk/examples/jms/clustered-grouping/server1/
trunk/examples/jms/clustered-grouping/server1/client-jndi.properties
trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml
trunk/examples/jms/clustered-grouping/server2/
trunk/examples/jms/clustered-grouping/server2/client-jndi.properties
trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml
trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml
trunk/examples/jms/clustered-grouping/src/
trunk/examples/jms/clustered-grouping/src/org/
trunk/examples/jms/clustered-grouping/src/org/hornetq/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/
trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/Response.java
Modified:
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/message-grouping.xml
trunk/examples/javaee/hornetq-javaee-examples.iml
trunk/examples/jms/hornetq-jms-examples.iml
trunk/hornetq.iml
trunk/hornetq.ipr
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/management/NotificationType.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/hornetq-tests.iml
trunk/tests/jms-tests/hornetq-jms-tests.iml
trunk/tests/joram-tests/hornetq-joram-tests.iml
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-46 - support for clustered groupings
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/docs/user-manual/en/examples.xml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -29,7 +29,8 @@
<section>
<title>JMS Examples</title>
<para>To run a JMS example, simply <literal>cd</literal> into
the appropriate example
- directory and type <literal>./build.sh</literal> (or
<literal>build.bat</literal> if you are on Windows).</para>
+ directory and type <literal>./build.sh</literal> (or
<literal>build.bat</literal> if you
+ are on Windows).</para>
<para>Here's a listing of the examples with a brief
description.</para>
<section id="application-level-failover">
<title>Application-Layer Failover</title>
@@ -90,6 +91,12 @@
be created to different nodes of the cluster. In other words it
demonstrates how
HornetQ does client side load balancing of connections across the
cluster.</para>
</section>
+ <section id="examples.clustered.grouping">
+ <title>Clustered Grouping</title>
+ <para>This is similar to the message grouping example except that it
demonstrates it
+ working over a cluster. Messages sent to different nodes with the same
group id will
+ be sent to the same node and the same consumer.</para>
+ </section>
<section>
<title>Clustered Queue</title>
<para>The <literal>clustered-queue</literal> example
demonstrates a JMS queue deployed
Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/docs/user-manual/en/message-grouping.xml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
-
<!-- =============================================================================
-->
<!-- Copyright © 2009 Red Hat, Inc. and others.
-->
<!--
-->
@@ -17,7 +16,6 @@
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent
-->
<!-- permitted by applicable law.
-->
<!-- =============================================================================
-->
-
<chapter id="message-grouping">
<title>Message Grouping</title>
<para>Message groups are sets of messages that has the following
characteristics:</para>
@@ -62,7 +60,7 @@
producer.send(message);
</programlisting>
<para>Alternatively, you can set <literal>autogroup</literal> to
true on the <literal
- >HornetQConnectonFactory</literal> which will pick a random unique
id. This can also be
+ >HornetQConnectonFactory</literal> which will pick a random unique
id. This can also be
set in the <literal>hornetq-jms.xml</literal> file like
this:</para>
<programlisting><connection-factory
name="ConnectionFactory">
<connector-ref connector-name="netty-connector"/>
@@ -74,7 +72,78 @@
</section>
<section>
<title>Example</title>
- <para>See <xref linkend="examples.message-group" /> for an
example which
- shows how message groups are configured and used with JMS.</para>
+ <para>See <xref linkend="examples.message-group"/> for an
example which shows how message
+ groups are configured and used with JMS.</para>
</section>
+ <section>
+ <title> Clustered Grouping</title>
+ <para>Using the Grouping function in a cluster is a bit more complex. This is
because messages
+ with a particular group id can arrive on any node so each node needs to know
about which
+ group id's are bound to which consumer on which node. To solve this there is
the notion of
+ a grouping handler. Each node will have its own grouping handler and when a
messages is
+ sent with a group id assigned, the handlers will decide between them which route
the
+ message should take. There are 2 types of handlers, Local and Remote. Each
cluster should
+ assign 1 node to have a local grouping handler and all the other nodes should
have remote
+ handlers, Its the local handler that actually makes the decsion as to what route
should be
+ used, all the other remote handler converse with this. Here is a sample config
for both
+ types of handler, this should be configured in the <emphasis
role="italic"
+ >hornetq-configuration.xml</emphasis>
+ file.<programlisting> <grouping-handler
name="my-grouping-handler">
+ <type>LOCAL</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler></programlisting></para>
+ <para>The <emphasis role="italic">address</emphasis>
attribute referes to a cluster connection
+ and the address it uses, refer to the clustering section on how to configure
clusters. The
+ <emphasis role="italic">timeout</emphasis> attribute
referes to how long to wait for a
+ decision to be made, an exception will be thrown during the send if this timeout
is
+ reached, this ensures that strict ordering is kept.</para>
+ <para>The decision as to where a message should be routed to is initially
proposed by the node
+ that receives the message. The node will pick a suitable route as per the normal
clustered
+ routing conditions, i.e. round robin available queues, use a local queue first
and choose a
+ queue that has a consumer. If the proposal is excepted by the grouping handlers
the node
+ will route messages to this queue from that point on, if rejected an alternative
route will
+ be offered and the node will again route to that queue indefinately. All other
nodes will
+ also route to the queue chosen at proposal time. Once the message arrives at the
queue then
+ normal single server message group semantics take over and the message ispinned
to a
+ consumer on that queue.</para>
+ <para>You may have noticed that there is a single point of failure with the
single local
+ handler. If this node crashes then no decisions will be able to be made. Any
messages sent
+ will be not be delivered and an exception thrown. To avoid this happening Local
Handlers
+ can be replicated on another backup node. Simple create your back up node and
configure it
+ with the same Local handler.</para>
+ <para/>
+ <section>
+ <title>Clustered Grouping Best Practices</title>
+ <para>Some best practices should be followed when using clustered
grouping, these
+ are<orderedlist><listitem><para>Make sure your consumers
are distributed evenly
+ across the different nodes if possible. This is only an issue if you
are
+ creating and closing consumers regularly. Since messages are always
routed to
+ the same queue once pinned, removing a consumer from this queue may
leave it
+ with no consumers meaning the queue will just keep receiving the
messages.
+ Avoid closing consumers or make sure that you always have plenty of
consumers,
+ i.e., if you have 3 nodes have 3
+
consumers.</para></listitem><listitem><para>Use durable queues if
possible. If
+ queues are removed once a group is bound to it, then it is possible
that other
+ nodes may still try to route messages to it. This can be avoided by
making sure
+ that the queue is deleted by the session that is sending the
messages. This
+ means that when the next message is sent it is sent to the node
where the queue
+ was deleted meaning a new proposal can succesfully take place.
Alternatively
+ you could just start using a different group
+ id.</para></listitem><listitem><para>Always
make sure that the node that has
+ the Local Grouping Handler is replicated. These means that on
failover grouping
+ will still
occur.</para></listitem></orderedlist></para>
+ </section>
+ <section>
+ <title>Clustered Grouping Example</title>
+ <para>See <xref linkend="examples.clustered.grouping"/>
for an example of how to configure
+ message groups with a HornetQ cluster</para>
+ </section>
+ </section>
</chapter>
Modified: trunk/examples/javaee/hornetq-javaee-examples.iml
===================================================================
--- trunk/examples/javaee/hornetq-javaee-examples.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/examples/javaee/hornetq-javaee-examples.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/ejb-jms-transaction/src"
isTestSource="false" />
Copied: trunk/examples/jms/clustered-grouping (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping)
Copied: trunk/examples/jms/clustered-grouping/build.bat (from rev 8142,
trunk/examples/jms/clustered-durable-subscription/build.bat)
===================================================================
--- trunk/examples/jms/clustered-grouping/build.bat (rev 0)
+++ trunk/examples/jms/clustered-grouping/build.bat 2009-10-28 18:32:19 UTC (rev 8158)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Copied: trunk/examples/jms/clustered-grouping/build.sh (from rev 8142,
trunk/examples/jms/clustered-durable-subscription/build.sh)
===================================================================
--- trunk/examples/jms/clustered-grouping/build.sh (rev 0)
+++ trunk/examples/jms/clustered-grouping/build.sh 2009-10-28 18:32:19 UTC (rev 8158)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Deleted: trunk/examples/jms/clustered-grouping/build.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/build.xml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE project [
- <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
- ]>
-<!--
- ~ Copyright 2009 Red Hat, Inc.
- ~ Red Hat licenses this file to you under the Apache License, version
- ~ 2.0 (the "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
http://www.apache.org/licenses/LICENSE-2.0
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- ~ implied. See the License for the specific language governing
- ~ permissions and limitations under the License.
- -->
-<project default="run" name="HornetQ JMS Clustered Grouping
Example">
-
- <import file="../../common/build.xml"/>
-
- <target name="run">
- <antcall target="runExample">
- <param name="example.classname"
value="org.hornetq.jms.example.ClusteredGroupingExample"/>
- <param name="hornetq.example.beans.file" value="server0
server1 server2"/>
- </antcall>
- </target>
-
- <target name="runRemote">
- <antcall target="runExample">
- <param name="example.classname"
value="org.hornetq.jms.example.ClusteredGroupingExample"/>
- <param name="hornetq.example.runServer"
value="false"/>
- </antcall>
- </target>
-
-</project>
Copied: trunk/examples/jms/clustered-grouping/build.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/build.xml (rev 0)
+++ trunk/examples/jms/clustered-grouping/build.xml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ JMS Clustered Grouping
Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.beans.file" value="server0
server1 server2"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.runServer"
value="false"/>
+ </antcall>
+ </target>
+
+</project>
Deleted: trunk/examples/jms/clustered-grouping/readme.html
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/readme.html 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,174 +0,0 @@
-<html>
- <head>
- <title>HornetQ JMS Load Balanced Queue Example</title>
- <link rel="stylesheet" type="text/css"
href="../../common/common.css">
- </head>
- <body>
- <h1>HornetQ JMS Load Balanced Clustered Queue Example</h1>
- <br>
- <p>This example demonstrates a JMS queue deployed on two different nodes. The
two nodes are configured to form a cluster.</p>
- <p>We then create a consumer on the queue on each node, and we create a
producer on only one of the nodes.</p>
- <p>We then send some messages via the producer, and we verify that
<b>both</b> consumers receive the sent messages
- in a round-robin fashion.</p>
- <p>In other words, HornetQ <b>load balances</b> the sent messages
across all consumers on the cluster</p>
- <p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory
objects. If you prefer not to use
- JNDI, these could be instantiated directly.</p>
- <p>Here's the relevant snippet from the server configuration, which tells
the server to form a cluster between the two nodes
- and to load balance the messages between the nodes.</p>
- <pre>
- <code><cluster-connection name="my-cluster">
- <address>jms</address>
- <retry-interval>500</retry-interval>
-
<use-duplicate-detection>true</use-duplicate-detection>
-
<forward-when-no-consumers>true</forward-when-no-consumers>
- <max-hops>1</max-hops>
- <discovery-group-ref
discovery-group-name="my-discovery-group"/>
- </cluster-connection>
- </code>
- </pre>
- <p>For more information on HornetQ load balancing, and clustering in general,
please see the clustering
- section of the user manual.</p>
- <h2>Example step-by-step</h2>
- <p><i>To run the example, simply type <code>ant</code> from
this directory</i></p>
- <br>
- <ol>
- <li> Get an initial context for looking up JNDI from server 0.</li>
- <pre>
- <code>
- ic0 = getContext(0);
- </code>
- </pre>
-
- <li>Look-up the JMS Queue object from JNDI</li>
- <pre>
- <code>Queue queue =
(Queue)ic0.lookup("/queue/exampleQueue");</code>
- </pre>
-
- <li>Look-up a JMS Connection Factory object from JNDI on server
0</li>
- <pre>
- <code>ConnectionFactory cf0 =
(ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
- </pre>
-
- <li>Get an initial context for looking up JNDI from server 1.</li>
- <pre>
- <code>ic1 = getContext(1);</code>
- </pre>
-
- <li>Look-up a JMS Connection Factory object from JNDI on server
1</li>
- <pre>
- <code>ConnectionFactory cf1 =
(ConnectionFactory)ic1.lookup("/ConnectionFactory");
- </code>
- </pre>
-
- <li>We create a JMS Connection connection0 which is a connection to server
0</li>
- <pre>
- <code>
- connection0 = cf0.createConnection();
- </code>
- </pre>
-
- <li>We create a JMS Connection connection1 which is a connection to server
1</li>
- <pre>
- <code>
- connection1 = cf1.createConnection();
- </code>
- </pre>
-
- <li>We create a JMS Session on server 0</li>
- <pre>
- <code>
- Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
- </code>
- </pre>
-
- <li>We create a JMS Session on server 1</li>
- <pre>
- <code>
- Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- </code>
- </pre>
-
- <li>We start the connections to ensure delivery occurs on them</li>
- <pre>
- <code>
- connection0.start();
-
- connection1.start();
- </code>
- </pre>
-
- <li>We create JMS MessageConsumer objects on server 0 and server
1</li>
- <pre>
- <code>
- MessageConsumer consumer0 = session0.createConsumer(queue);
-
- MessageConsumer consumer1 = session1.createConsumer(queue);
- </code>
- </pre>
-
- <li>We create a JMS MessageProducer object on server 0.</li>
- <pre>
- <code>
- MessageProducer producer = session0.createProducer(queue);</code>
- </pre>
-
- <li>We send some messages to server 0.</li>
- <pre>
- <code>
- final int numMessages = 10;
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage message = session0.createTextMessage("This is text message " +
i);
-
- producer.send(message);
-
- System.out.println("Sent message: " + message.getText());
- }
- </code>
- </pre>
-
- <li>We now consume those messages on *both* server 0 and server 1.
- We note the messages have been distributed between servers in a round robin
fashion.
- HornetQ has <b>load balanced</b> the messages between the available
consumers on the different nodes.
- HornetQ can be configured to always load balance messages to all nodes, or to
only balance messages
- to nodes which have consumers with no or matching selectors. See the user manual
for more details.</li>
- JMS Queues implement point-to-point message where each message is only ever
consumed by a
- maximum of one consumer.
- <pre>
- <code>
- for (int i = 0; i < numMessages; i += 2)
- {
- TextMessage message0 = (TextMessage)consumer0.receive(5000);
-
- System.out.println("Got message: " + message0.getText() + " from node
0");
-
- TextMessage message1 = (TextMessage)consumer1.receive(5000);
-
- System.out.println("Got message: " + message1.getText() + " from node
1");
- }
- </code>
- </pre>
-
- <li>And finally (no pun intended), <b>always</b> remember to
close your JMS resources after use, in a <code>finally</code> block. Closing a
JMS connection will automatically close all of its sessions, consumers, producer and
browser objects</li>
-
- <pre>
- <code>
- finally
- {
- if (connection0 != null)
- {
- connection0.close();
- }
-
- if (connection1 != null)
- {
- connection1.close();
- }
- }
- </code>
- </pre>
-
- </ol>
- </body>
-</html>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/readme.html (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html)
===================================================================
--- trunk/examples/jms/clustered-grouping/readme.html (rev 0)
+++ trunk/examples/jms/clustered-grouping/readme.html 2009-10-28 18:32:19 UTC (rev 8158)
@@ -0,0 +1,238 @@
+<html>
+ <head>
+ <title>HornetQ JMS Clustered Grouping Example</title>
+ <link rel="stylesheet" type="text/css"
href="../../common/common.css">
+ </head>
+ <body>
+ <h1>HornetQ JMS Clustered Grouping Example</h1>
+ <br>
+ <p>This example demonstrates how to ensure strict ordering across a cluster
using clustered message grouping</p>
+ <p>We create 3 nodes each with a grouping message handler, one with a Local
handler and 2 with a Remote handler.</p>
+ <p>The local handler acts as an arbitrator for the 2 remote handlers, holding
the information on routes and communicating that with the remote handlers</p>
+ <p>We then send some messages to each node with the same group id set and
ensure the same consumer receives all of them</p>
+ <p>Here's the relevant snippet from the server configuration that has the
local handler</p>
+ <pre>
+ <code>
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+
<use-duplicate-detection>true</use-duplicate-detection>
+
<forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref
discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>LOCAL</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+ </code>
+ </pre>
+
+ <p>Here's the relevant snippet from the server configuration that has the
remote handlers</p>
+ <pre>
+ <code><cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+
<use-duplicate-detection>true</use-duplicate-detection>
+
<forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref
discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+ </code>
+ </pre>
+
+ <p>For more information on HornetQ clustering and grouping see the clustering
and grouping
+ section of the user manual.</p>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type
<code>./build.sh</code> (or <code>build.bat</code> on windows)
from this directory</i></p>
+ <br>
+ <ol>
+ <li> Get an initial context for looking up JNDI from server 0.</li>
+ <pre class="prettyprint">
+ <code>ic0 = getContext(0);</code>
+ </pre>
+
+ <li>Look-up the JMS Queue object from JNDI</li>
+ <pre class="prettyprint">
+ <code>Queue queue =
(Queue)ic0.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server
0</li>
+ <pre class="prettyprint">
+ <code>ConnectionFactory cf0 =
(ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>Get an initial context for looking up JNDI from server 1.</li>
+ <pre class="prettyprint">
+ <code>ic1 = getContext(1);</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server
1</li>
+ <pre class="prettyprint">
+ <code>ConnectionFactory cf1 =
(ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ </code>
+ </pre>
+
+ <li>Get an initial context for looking up JNDI from server 2.</li>
+ <pre class="prettyprint">
+ <code>ic2 = getContext(2);</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server
2</li>
+ <pre class="prettyprint">
+ <code>ConnectionFactory cf2 =
(ConnectionFactory)ic2.lookup("/ConnectionFactory");
+ </code>
+ </pre>
+
+ <li>We create a JMS Connection connection0 which is a connection to server
0</li>
+ <pre class="prettyprint">
+ <code>connection0 = cf0.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS Connection connection0 which is a connection to server
1</li>
+ <pre class="prettyprint">
+ <code>connection1 = cf1.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS Connection connection0 which is a connection to server
2</li>
+ <pre class="prettyprint">
+ <code>connection2 = cf2.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS Session on server 0</li>
+ <pre class="prettyprint">
+ <code>Session session0 = connection0.createSession(false,
Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We create a JMS Session on server 1</li>
+ <pre class="prettyprint">
+ <code>Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We create a JMS Session on server 2</li>
+ <pre class="prettyprint">
+ <code>Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We start the connections to ensure delivery occurs on them</li>
+ <pre class="prettyprint">
+ <code>connection0.start();
+
+ connection1.start();
+
+ connection2.start();</code>
+ </pre>
+
+ <li>We create JMS MessageConsumer objects on server 0</li>
+ <pre class="prettyprint">
+ <code>MessageConsumer consumer =
session0.createConsumer(queue);</code>
+ </pre>
+
+ <li>We create a JMS MessageProducer object on server 0, 1 and
2</li>
+ <pre class="prettyprint">
+ <code>MessageProducer producer0 = session0.createProducer(queue);
+
+ MessageProducer producer1 = session1.createProducer(queue);
+
+ MessageProducer producer2 = session2.createProducer(queue);</code>
+ </pre>
+
+ <li>We send some messages to server 0, 1 and 2 with the same groupid
set</li>
+ <pre class="prettyprint">
+ <code>final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message
" + i);
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer0.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 0");
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session1.createTextMessage("This is text message
" + (i + 10));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer1.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 1");
+
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session2.createTextMessage("This is text message
" + (i + 20));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer2.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 2");
+ }
+ </code>
+ </pre>
+
+ <li>We now consume those messages from server 0. We note the messages have
all been sent to the same consumer on the same node</li>
+ <pre class="prettyprint">
+ <code>for (int i = 0; i < numMessages * 3; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + "
from node 0");
+
+ }
+ </code>
+ </pre>
+ <li>Finally, Be sure to close our resources!</li>
+ <pre class="prettyprint">
+ <code>if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+
+ if (connection2 != null)
+ {
+ connection2.close();
+ }
+
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+
+ if (ic2 != null)
+ {
+ ic2.close();
+ }</code>
+ </pre>
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server0 (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0)
Deleted: trunk/examples/jms/clustered-grouping/server0/client-jndi.properties
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server0/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,3 +0,0 @@
-java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
-java.naming.provider.url=jnp://localhost:1099
-java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Copied: trunk/examples/jms/clustered-grouping/server0/client-jndi.properties (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties)
===================================================================
--- trunk/examples/jms/clustered-grouping/server0/client-jndi.properties
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server0/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Deleted: trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<deployment xmlns="urn:jboss:bean-deployer:2.0">
-
- <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
-
- <!-- JNDI server. Disable this if you don't want JNDI -->
- <bean name="JNDIServer" class="org.jnp.server.Main">
- <property name="namingInfo">
- <inject bean="Naming"/>
- </property>
- <property name="port">1099</property>
- <property name="bindAddress">localhost</property>
- <property name="rmiPort">1098</property>
- <property name="rmiBindAddress">localhost</property>
- </bean>
-
- <!-- MBean server -->
- <bean name="MBeanServer"
class="javax.management.MBeanServer">
- <constructor factoryClass="java.lang.management.ManagementFactory"
- factoryMethod="getPlatformMBeanServer"/>
- </bean>
-
- <!-- The core configuration -->
- <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
-
- <!-- The security manager -->
- <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The core server -->
- <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
- <constructor>
- <parameter>
- <inject bean="Configuration"/>
- </parameter>
- <parameter>
- <inject bean="MBeanServer"/>
- </parameter>
- <parameter>
- <inject bean="HornetQSecurityManager"/>
- </parameter>
- </constructor>
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The JMS server -->
- <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
- <constructor>
- <parameter>
- <inject bean="HornetQServer"/>
- </parameter>
- </constructor>
- </bean>
-
-
-</deployment>
Copied: trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+
+</deployment>
Deleted: trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,71 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
- <!-- Connectors -->
-
- <connectors>
- <connector name="netty-connector">
-
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5445"
type="Integer"/>
- </connector>
- </connectors>
-
- <!-- Acceptors -->
- <acceptors>
- <acceptor name="netty-acceptor">
-
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5445"
type="Integer"/>
- </acceptor>
- </acceptors>
-
- <!-- Clustering configuration -->
- <broadcast-groups>
- <broadcast-group name="my-broadcast-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <broadcast-period>100</broadcast-period>
- <connector-ref connector-name="netty-connector"/>
- </broadcast-group>
- </broadcast-groups>
-
- <discovery-groups>
- <discovery-group name="my-discovery-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <refresh-timeout>10000</refresh-timeout>
- </discovery-group>
- </discovery-groups>
-
- <cluster-connections>
- <cluster-connection name="my-cluster">
- <address>jms</address>
- <retry-interval>500</retry-interval>
- <use-duplicate-detection>true</use-duplicate-detection>
- <forward-when-no-consumers>true</forward-when-no-consumers>
- <max-hops>1</max-hops>
- <discovery-group-ref
discovery-group-name="my-discovery-group"/>
- </cluster-connection>
- </cluster-connections>
-
- <grouping-handler name="my-grouping-handler">
- <type>LOCAL</type>
- <address>jms</address>
- </grouping-handler>
-
- <!-- Other config -->
-
- <security-settings>
- <!--security for example queue-->
- <security-setting match="jms.queue.exampleQueue">
- <permission type="createDurableQueue" roles="guest"/>
- <permission type="deleteDurableQueue" roles="guest"/>
- <permission type="createTempQueue" roles="guest"/>
- <permission type="deleteTempQueue" roles="guest"/>
- <permission type="consume" roles="guest"/>
- <permission type="send" roles="guest"/>
- </security-setting>
- </security-settings>
-
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,72 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445"
type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445"
type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref
discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>LOCAL</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
\ No newline at end of file
Deleted: trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -1,17 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
- <!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
- <connector-ref connector-name="netty-connector"/>
- <entries>
- <entry name="ConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <!--the queue used by the example-->
- <queue name="exampleQueue">
- <entry name="/queue/exampleQueue"/>
- </queue>
-
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Deleted: trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,7 +0,0 @@
-<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
- <!-- the default user. this is used where username is null-->
- <defaultuser name="guest" password="guest">
- <role name="guest"/>
- </defaultuser>
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server0/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server1 (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1)
Deleted: trunk/examples/jms/clustered-grouping/server1/client-jndi.properties
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server1/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,3 +0,0 @@
-java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
-java.naming.provider.url=jnp://localhost:2099
-java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Copied: trunk/examples/jms/clustered-grouping/server1/client-jndi.properties (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties)
===================================================================
--- trunk/examples/jms/clustered-grouping/server1/client-jndi.properties
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server1/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Deleted: trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<deployment xmlns="urn:jboss:bean-deployer:2.0">
-
- <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
-
- <!-- JNDI server. Disable this if you don't want JNDI -->
- <bean name="JNDIServer" class="org.jnp.server.Main">
- <property name="namingInfo">
- <inject bean="Naming"/>
- </property>
- <property name="port">2099</property>
- <property name="bindAddress">localhost</property>
- <property name="rmiPort">2098</property>
- <property name="rmiBindAddress">localhost</property>
- </bean>
-
- <!-- MBean server -->
- <bean name="MBeanServer"
class="javax.management.MBeanServer">
- <constructor factoryClass="java.lang.management.ManagementFactory"
- factoryMethod="getPlatformMBeanServer"/>
- </bean>
-
- <!-- The core configuration -->
- <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
-
- <!-- The security manager -->
- <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The core server -->
- <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
- <constructor>
- <parameter>
- <inject bean="Configuration"/>
- </parameter>
- <parameter>
- <inject bean="MBeanServer"/>
- </parameter>
- <parameter>
- <inject bean="HornetQSecurityManager"/>
- </parameter>
- </constructor>
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The JMS server -->
- <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
- <constructor>
- <parameter>
- <inject bean="HornetQServer"/>
- </parameter>
- </constructor>
- </bean>
-
-</deployment>
Copied: trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Deleted: trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,71 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
- <!-- Connectors -->
- <connectors>
- <connector name="netty-connector">
-
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5446"
type="Integer"/>
- </connector>
- </connectors>
-
- <!-- Acceptors -->
- <acceptors>
- <acceptor name="netty-acceptor">
-
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5446"
type="Integer"/>
- </acceptor>
- </acceptors>
-
- <!-- Clustering configuration -->
- <broadcast-groups>
- <broadcast-group name="my-broadcast-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <broadcast-period>100</broadcast-period>
- <connector-ref connector-name="netty-connector"/>
- </broadcast-group>
- </broadcast-groups>
-
- <discovery-groups>
- <discovery-group name="my-discovery-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <refresh-timeout>10000</refresh-timeout>
- </discovery-group>
- </discovery-groups>
-
- <cluster-connections>
- <cluster-connection name="my-cluster">
- <address>jms</address>
- <retry-interval>500</retry-interval>
- <use-duplicate-detection>true</use-duplicate-detection>
- <forward-when-no-consumers>true</forward-when-no-consumers>
- <max-hops>1</max-hops>
- <discovery-group-ref
discovery-group-name="my-discovery-group"/>
- </cluster-connection>
- </cluster-connections>
-
- <grouping-handler name="my-grouping-handler">
- <type>REMOTE</type>
- <address>jms</address>
- <timeout>5000</timeout>
- </grouping-handler>
-
- <!-- Other config -->
-
- <security-settings>
- <!--security for example queue-->
- <security-setting match="jms.queue.exampleQueue">
- <permission type="createDurableQueue" roles="guest"/>
- <permission type="deleteDurableQueue" roles="guest"/>
- <permission type="createTempQueue" roles="guest"/>
- <permission type="deleteTempQueue" roles="guest"/>
- <permission type="consume" roles="guest"/>
- <permission type="send" roles="guest"/>
- </security-setting>
- </security-settings>
-
-</configuration>
Copied: trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,71 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446"
type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446"
type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref
discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Deleted: trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -1,17 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
- <!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
- <connector-ref connector-name="netty-connector"/>
- <entries>
- <entry name="ConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <!--the queue used by the example-->
- <queue name="exampleQueue">
- <entry name="/queue/exampleQueue"/>
- </queue>
-
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Deleted: trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,7 +0,0 @@
-<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
- <!-- the default user. this is used where username is null-->
- <defaultuser name="guest" password="guest">
- <role name="guest"/>
- </defaultuser>
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server1/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server2 (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2)
Deleted: trunk/examples/jms/clustered-grouping/server2/client-jndi.properties
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server2/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,3 +0,0 @@
-java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
-java.naming.provider.url=jnp://localhost:3099
-java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Copied: trunk/examples/jms/clustered-grouping/server2/client-jndi.properties (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties)
===================================================================
--- trunk/examples/jms/clustered-grouping/server2/client-jndi.properties
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server2/client-jndi.properties 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:3099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Deleted: trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<deployment xmlns="urn:jboss:bean-deployer:2.0">
-
- <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
-
- <!-- JNDI server. Disable this if you don't want JNDI -->
- <bean name="JNDIServer" class="org.jnp.server.Main">
- <property name="namingInfo">
- <inject bean="Naming"/>
- </property>
- <property name="port">3099</property>
- <property name="bindAddress">localhost</property>
- <property name="rmiPort">3098</property>
- <property name="rmiBindAddress">localhost</property>
- </bean>
-
- <!-- MBean server -->
- <bean name="MBeanServer"
class="javax.management.MBeanServer">
- <constructor factoryClass="java.lang.management.ManagementFactory"
- factoryMethod="getPlatformMBeanServer"/>
- </bean>
-
- <!-- The core configuration -->
- <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
-
- <!-- The security manager -->
- <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The core server -->
- <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
- <constructor>
- <parameter>
- <inject bean="Configuration"/>
- </parameter>
- <parameter>
- <inject bean="MBeanServer"/>
- </parameter>
- <parameter>
- <inject bean="HornetQSecurityManager"/>
- </parameter>
- </constructor>
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
-
- <!-- The JMS server -->
- <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
- <constructor>
- <parameter>
- <inject bean="HornetQServer"/>
- </parameter>
- </constructor>
- </bean>
-
-</deployment>
Copied: trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-beans.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">3099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">3098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Deleted: trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,71 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
- <!-- Connectors -->
- <connectors>
- <connector name="netty-connector">
-
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5447"
type="Integer"/>
- </connector>
- </connectors>
-
- <!-- Acceptors -->
- <acceptors>
- <acceptor name="netty-acceptor">
-
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
- <param key="hornetq.remoting.netty.port" value="5447"
type="Integer"/>
- </acceptor>
- </acceptors>
-
- <!-- Clustering configuration -->
- <broadcast-groups>
- <broadcast-group name="my-broadcast-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <broadcast-period>100</broadcast-period>
- <connector-ref connector-name="netty-connector"/>
- </broadcast-group>
- </broadcast-groups>
-
- <discovery-groups>
- <discovery-group name="my-discovery-group">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <refresh-timeout>10000</refresh-timeout>
- </discovery-group>
- </discovery-groups>
-
- <cluster-connections>
- <cluster-connection name="my-cluster">
- <address>jms</address>
- <retry-interval>500</retry-interval>
- <use-duplicate-detection>true</use-duplicate-detection>
- <forward-when-no-consumers>true</forward-when-no-consumers>
- <max-hops>1</max-hops>
- <discovery-group-ref
discovery-group-name="my-discovery-group"/>
- </cluster-connection>
- </cluster-connections>
-
- <grouping-handler name="my-grouping-handler">
- <type>REMOTE</type>
- <address>jms</address>
- <timeout>5000</timeout>
- </grouping-handler>
-
- <!-- Other config -->
-
- <security-settings>
- <!--security for example queue-->
- <security-setting match="jms.queue.exampleQueue">
- <permission type="createDurableQueue" roles="guest"/>
- <permission type="deleteDurableQueue" roles="guest"/>
- <permission type="createTempQueue" roles="guest"/>
- <permission type="deleteTempQueue" roles="guest"/>
- <permission type="consume" roles="guest"/>
- <permission type="send" roles="guest"/>
- </security-setting>
- </security-settings>
-
-</configuration>
Copied: trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml (from rev
8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-configuration.xml 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,71 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447"
type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447"
type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref
discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ <timeout>5000</timeout>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Deleted: trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -1,17 +0,0 @@
-<configuration xmlns="urn:hornetq"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
- <!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
- <connector-ref connector-name="netty-connector"/>
- <entries>
- <entry name="ConnectionFactory"/>
- </entries>
- </connection-factory>
-
- <!--the queue used by the example-->
- <queue name="exampleQueue">
- <entry name="/queue/exampleQueue"/>
- </queue>
-
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-jms.xml 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Deleted: trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,7 +0,0 @@
-<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
- <!-- the default user. this is used where username is null-->
- <defaultuser name="guest" password="guest">
- <role name="guest"/>
- </defaultuser>
-</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml)
===================================================================
--- trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml
(rev 0)
+++ trunk/examples/jms/clustered-grouping/server2/hornetq-users.xml 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/clustered-grouping/src (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src)
Copied: trunk/examples/jms/clustered-grouping/src/org (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org)
Copied: trunk/examples/jms/clustered-grouping/src/org/hornetq (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq)
Copied: trunk/examples/jms/clustered-grouping/src/org/hornetq/jms (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms)
Copied: trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example (from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example)
Deleted:
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
===================================================================
---
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-10-22
17:42:57 UTC (rev 8139)
+++
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,191 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.jms.example;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-
-import org.hornetq.common.example.HornetQExample;
-import org.hornetq.jms.client.HornetQMessage;
-
-/**
- * A simple example that demonstrates server side load-balancing of messages between the
queue instances on different
- * nodes of the cluster.
- *
- * @author <a href="tim.fox(a)jboss.com>Tim Fox</a>
- */
-public class ClusteredGroupingExample extends HornetQExample
-{
- public static void main(String[] args)
- {
- new ClusteredGroupingExample().run(args);
- }
-
- public boolean runExample() throws Exception
- {
- Connection connection0 = null;
-
- Connection connection1 = null;
-
- Connection connection2 = null;
-
- InitialContext ic0 = null;
-
- InitialContext ic1 = null;
-
- InitialContext ic2 = null;
-
- try
- {
- // Step 1. Get an initial context for looking up JNDI from server 0
- ic0 = getContext(0);
-
- // Step 2. Look-up the JMS Queue object from JNDI
- Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
-
- // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
- ConnectionFactory cf0 =
(ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- // Step 4. Get an initial context for looking up JNDI from server 1
- ic1 = getContext(1);
-
- // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
- ConnectionFactory cf1 =
(ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- // Step 4. Get an initial context for looking up JNDI from server 1
- ic2 = getContext(2);
-
- // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
- ConnectionFactory cf2 =
(ConnectionFactory)ic2.lookup("/ConnectionFactory");
-
- // Step 6. We create a JMS Connection connection0 which is a connection to
server 0
- connection0 = cf0.createConnection();
-
- // Step 7. We create a JMS Connection connection1 which is a connection to
server 1
- connection1 = cf1.createConnection();
-
- // Step 7. We create a JMS Connection connection1 which is a connection to
server 1
- connection2 = cf2.createConnection();
-
- // Step 8. We create a JMS Session on server 0
- Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Step 9. We create a JMS Session on server 1
- Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Step 10. We start the connections to ensure delivery occurs on them
- connection0.start();
-
- connection1.start();
-
- connection2.start();
-
- // Step 11. We create JMS MessageConsumer objects on server 0 and server 1
- MessageConsumer consumer = session0.createConsumer(queue);
-
-
- // Step 12. We create a JMS MessageProducer object on server 0
- MessageProducer producer0 = session0.createProducer(queue);
-
- MessageProducer producer1 = session1.createProducer(queue);
-
- MessageProducer producer2 = session2.createProducer(queue);
-
- // Step 13. We send some messages to server 0
-
- final int numMessages = 10;
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage message = session0.createTextMessage("This is text message
" + i);
-
- message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
-
- producer0.send(message);
-
- System.out.println("Sent messages: " + message.getText() + "
to node 0");
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage message = session1.createTextMessage("This is text message
" + (i + 10));
-
- message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
-
- producer1.send(message);
-
- System.out.println("Sent messages: " + message.getText() + "
to node 1");
-
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage message = session2.createTextMessage("This is text message
" + (i + 20));
-
- message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
-
- producer2.send(message);
-
- System.out.println("Sent messages: " + message.getText() + "
to node 2");
- }
-
- // Step 14. We now consume those messages on *both* server 0 and server 1.
- // We note the messages have been distributed between servers in a round robin
fashion
- // JMS Queues implement point-to-point message where each message is only ever
consumed by a
- // maximum of one consumer
-
- for (int i = 0; i < numMessages * 3; i++)
- {
- TextMessage message0 = (TextMessage)consumer.receive(5000);
-
- System.out.println("Got message: " + message0.getText() + "
from node 0");
-
- }
-
- return true;
- }
- finally
- {
- // Step 15. Be sure to close our resources!
-
- if (connection0 != null)
- {
- connection0.close();
- }
-
- if (connection1 != null)
- {
- connection1.close();
- }
-
- if (ic0 != null)
- {
- ic0.close();
- }
-
- if (ic1 != null)
- {
- ic1.close();
- }
- }
- }
-
-}
\ No newline at end of file
Copied:
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
(from rev 8139,
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java)
===================================================================
---
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
(rev 0)
+++
trunk/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple example that demonstrates server side load-balancing of messages between the
queue instances on different
+ * nodes of the cluster.
+ *
+ * @author <a href="tim.fox(a)jboss.com>Tim Fox</a>
+ */
+public class ClusteredGroupingExample extends HornetQExample
+{
+ public static void main(String[] args)
+ {
+ new ClusteredGroupingExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Connection connection0 = null;
+
+ Connection connection1 = null;
+
+ Connection connection2 = null;
+
+ InitialContext ic0 = null;
+
+ InitialContext ic1 = null;
+
+ InitialContext ic2 = null;
+
+ try
+ {
+ // Step 1. Get an initial context for looking up JNDI from server 0
+ ic0 = getContext(0);
+
+ // Step 2. Look-up the JMS Queue object from JNDI
+ Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
+
+ // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
+ ConnectionFactory cf0 =
(ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic1 = getContext(1);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf1 =
(ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic2 = getContext(2);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf2 =
(ConnectionFactory)ic2.lookup("/ConnectionFactory");
+
+ // Step 6. We create a JMS Connection connection0 which is a connection to
server 0
+ connection0 = cf0.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to
server 1
+ connection1 = cf1.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to
server 1
+ connection2 = cf2.createConnection();
+
+ // Step 8. We create a JMS Session on server 0
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 9. We create a JMS Session on server 1
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 10. We create a JMS Session on server 1
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 11. We start the connections to ensure delivery occurs on them
+ connection0.start();
+
+ connection1.start();
+
+ connection2.start();
+
+ // Step 12. We create JMS MessageConsumer objects on server 0
+ MessageConsumer consumer = session0.createConsumer(queue);
+
+
+ // Step 13. We create a JMS MessageProducer object on server 0, 1 and 2
+ MessageProducer producer0 = session0.createProducer(queue);
+
+ MessageProducer producer1 = session1.createProducer(queue);
+
+ MessageProducer producer2 = session2.createProducer(queue);
+
+ // Step 14. We send some messages to server 0, 1 and 2 with the same groupid
set
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message
" + i);
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer0.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 0");
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session1.createTextMessage("This is text message
" + (i + 10));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer1.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 1");
+
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session2.createTextMessage("This is text message
" + (i + 20));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer2.send(message);
+
+ System.out.println("Sent messages: " + message.getText() + "
to node 2");
+ }
+
+ // Step 15. We now consume those messages from server 0
+ // We note the messages have all been sent to the same consumer on the same
node
+
+ for (int i = 0; i < numMessages * 3; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + "
from node 0");
+
+ }
+
+ return true;
+ }
+ finally
+ {
+ // Step 16. Be sure to close our resources!
+
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+
+ if (connection2 != null)
+ {
+ connection2.close();
+ }
+
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+
+ if (ic2 != null)
+ {
+ ic2.close();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: trunk/examples/jms/hornetq-jms-examples.iml
===================================================================
--- trunk/examples/jms/hornetq-jms-examples.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/examples/jms/hornetq-jms-examples.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,8 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
- <output-test url="file://$MODULE_DIR$/output" />
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$/../common">
<sourceFolder url="file://$MODULE_DIR$/../common/src"
isTestSource="false" />
Modified: trunk/hornetq.iml
===================================================================
--- trunk/hornetq.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/hornetq.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,8 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
- <exclude-output />
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5"
inherit-compiler-output="true">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/build/src"
isTestSource="false" />
<sourceFolder
url="file://$MODULE_DIR$/examples/jms/static-selector-jms/src"
isTestSource="true" />
@@ -38,6 +36,7 @@
<excludeFolder url="file://$MODULE_DIR$/tests/lib/jdbc-drivers" />
<excludeFolder url="file://$MODULE_DIR$/tests/output" />
</content>
+ <orderEntry type="library" name="etc"
level="project" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="messaging"
level="project" />
Modified: trunk/hornetq.ipr
===================================================================
--- trunk/hornetq.ipr 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/hornetq.ipr 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project relativePaths="false" version="4">
+<project relativePaths="true" version="4">
<component name="AntConfiguration">
<defaultAnt bundledAnt="true" />
<buildFile url="file://$PROJECT_DIR$/build-hornetq.xml">
@@ -129,7 +129,7 @@
<maximumHeapSize value="128" />
<maximumStackSize value="32" />
<properties>
- <property name="ENV.JBOSS_HOME"
value="/home/andy/projects/jbossas5.1/build/output/jboss-5.1.0.CR1" />
+ <property name="ENV.JBOSS_HOME"
value="$PROJECT_DIR$/../jbossas5.1/build/output/jboss-5.1.0.CR1" />
</properties>
</buildFile>
<buildFile
url="file://$PROJECT_DIR$/examples/javaee/ejb-jms-transaction/build.xml">
@@ -139,7 +139,7 @@
<maximumHeapSize value="128" />
<maximumStackSize value="32" />
<properties>
- <property name="ENV.JBOSS_HOME"
value="/home/andy/projects/jbossas5.1/build/output/jboss-5.1.0.CR1" />
+ <property name="ENV.JBOSS_HOME"
value="$PROJECT_DIR$/../jbossas5.1/build/output/jboss-5.1.0.CR1" />
</properties>
</buildFile>
<buildFile
url="file://$PROJECT_DIR$/examples/jms/static-selector-jms/build.xml">
@@ -602,7 +602,9 @@
<module fileurl="file://$PROJECT_DIR$/tests/hornetq-tests.iml"
filepath="$PROJECT_DIR$/tests/hornetq-tests.iml" />
</modules>
</component>
- <component name="ProjectRootManager" version="2"
languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true"
project-jdk-name="1.6" project-jdk-type="JavaSDK" />
+ <component name="ProjectRootManager" version="2"
languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true"
project-jdk-name="1.6" project-jdk-type="JavaSDK">
+ <output url="file://$PROJECT_DIR$/classes" />
+ </component>
<component name="ResourceManagerContainer">
<option name="myResourceBundles">
<value>
@@ -697,7 +699,7 @@
</CLASSES>
<JAVADOC />
<SOURCES>
- <root
url="file:///home/andy/projects/JBOSSTS_4_6_1_GA/common/classes" />
+ <root url="file://$PROJECT_DIR$/../JBOSSTS_4_6_1_GA/common/classes"
/>
</SOURCES>
</library>
<library name="messaging-tests">
@@ -738,43 +740,50 @@
</library>
<library name="ant 1.7.1">
<CLASSES>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib/lib/commons-logging-1.0.4.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-commons-logging.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib/ant-contrib-1.0b3.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-jai.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-log4j.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-javamail.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-trax.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-antlr.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-jmf.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-resolver.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-jdepend.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-nodeps.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-starteam.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-bsf.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-stylebook.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-oro.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib/lib/commons-httpclient-3.0.1.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-jsch.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-commons-net.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/xml-apis.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-junit.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib/lib/ivy-1.3.1.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-swing.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-launcher.jar!/" />
- <root url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-testutil.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-contrib/lib/bcel-5.1.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-regexp.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-weblogic.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/xercesImpl.jar!/" />
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-apache-bcel.jar!/"
/>
- <root
url="jar:///home/andy/devtools/apache-ant-1.7.1/lib/ant-netrexx.jar!/" />
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/lib/commons-logging-1.0.4.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-commons-logging.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/ant-contrib-1.0b3.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-jai.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-log4j.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-javamail.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-trax.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-antlr.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-jmf.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-resolver.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-jdepend.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-nodeps.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-starteam.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-bsf.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-stylebook.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-oro.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/lib/commons-httpclient-3.0.1.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-jsch.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-commons-net.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/xml-apis.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-junit.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/lib/ivy-1.3.1.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-swing.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-launcher.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant.jar!/" />
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-testutil.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/lib/bcel-5.1.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-regexp.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-weblogic.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/xercesImpl.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-apache-bcel.jar!/"
/>
+ <root
url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-netrexx.jar!/"
/>
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
+ <library name="etc">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/../etc" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
</component>
</project>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-28 16:13:10 UTC (rev
8157)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-28 18:32:19 UTC (rev
8158)
@@ -132,6 +132,8 @@
</xsd:sequence>
</xsd:complexType>
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="grouping-handler" type="groupingHandlerType">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="paging-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="bindings-directory" type="xsd:string">
@@ -393,6 +395,22 @@
</xsd:restriction>
</xsd:simpleType>
+ <xsd:complexType name="groupingHandlerType">
+ <xsd:sequence>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="type" type="groupingHandlerTypeType"/>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="address" type="xsd:string"/>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="timeout" type="xsd:int"/>
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:string"
use="required"/>
+ </xsd:complexType>
+
+ <xsd:simpleType name="groupingHandlerTypeType">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="LOCAL"/>
+ <xsd:enumeration value="REMOTE"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+
<xsd:element name="security-settings">
<xsd:complexType>
<xsd:sequence>
Modified: trunk/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -71,6 +71,12 @@
public static final SimpleString HDR_CHECK_TYPE = new
SimpleString("_HQ_CheckType");
+ public static final SimpleString HDR_PROPOSAL_GROUP_ID = new
SimpleString("_JBM_ProposalGroupId");
+
+ public static final SimpleString HDR_PROPOSAL_VALUE = new
SimpleString("_JBM_ProposalValue");
+
+ public static final SimpleString HDR_PROPOSAL_ALT_VALUE = new
SimpleString("_JBM_ProposalAltValue");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-10-28 16:13:10 UTC (rev
8157)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-10-28 18:32:19 UTC (rev
8158)
@@ -25,6 +25,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -129,6 +130,10 @@
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration>
configs);
+ GroupingHandlerConfiguration getGroupingHandlerConfiguration();
+
+ void setGroupingHandlerConfiguration(GroupingHandlerConfiguration
groupingHandlerConfiguration);
+
List<BridgeConfiguration> getBridgeConfigurations();
void setBridgeConfigurations(final List<BridgeConfiguration> configs);
@@ -242,9 +247,9 @@
boolean isLogJournalWriteRate();
void setLogJournalWriteRate(boolean rate);
-
+
//Undocumented attributes
-
+
int getJournalPerfBlastPages();
void setJournalPerfBlastPages(int pages);
@@ -260,11 +265,11 @@
long getMemoryMeasureInterval();
void setMemoryMeasureInterval(long memoryMeasureInterval);
-
+
boolean isRunSyncSpeedTest();
-
+
void setRunSyncSpeedTest(boolean run);
-
+
// Paging Properties
--------------------------------------------------------------------
String getPagingDirectory();
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -31,6 +31,7 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.logging.impl.JULLogDelegateFactory;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -110,7 +111,7 @@
public static final boolean DEFAULT_JOURNAL_LOG_WRITE_RATE = false;
public static final int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
-
+
public static final boolean DEFAULT_RUN_SYNC_SPEED_TEST = false;
public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
@@ -272,7 +273,7 @@
protected boolean logJournalWriteRate = DEFAULT_JOURNAL_LOG_WRITE_RATE;
protected int journalPerfBlastPages = DEFAULT_JOURNAL_PERF_BLAST_PAGES;
-
+
protected boolean runSyncSpeedTest = DEFAULT_RUN_SYNC_SPEED_TEST;
protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
@@ -304,6 +305,8 @@
protected long memoryMeasureInterval = DEFAULT_MEMORY_MEASURE_INTERVAL;
+ protected GroupingHandlerConfiguration groupingHandlerConfiguration;
+
// Public -------------------------------------------------------------------------
public void start() throws Exception
@@ -485,6 +488,17 @@
this.backupConnectorName = backupConnectorName;
}
+ public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
+ {
+ return groupingHandlerConfiguration;
+ }
+
+ public void setGroupingHandlerConfiguration(GroupingHandlerConfiguration
groupingHandlerConfiguration)
+ {
+ this.groupingHandlerConfiguration = groupingHandlerConfiguration;
+ }
+
+
public List<BridgeConfiguration> getBridgeConfigurations()
{
return bridgeConfigurations;
@@ -674,7 +688,7 @@
{
this.journalPerfBlastPages = journalPerfBlastPages;
}
-
+
public boolean isRunSyncSpeedTest()
{
return runSyncSpeedTest;
@@ -1123,5 +1137,5 @@
{
this.logDelegateFactoryClassName = className;
}
-
+
}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -46,6 +46,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.XMLUtil;
@@ -253,6 +254,15 @@
parseBridgeConfiguration(mfNode);
}
+ NodeList gaNodes = e.getElementsByTagName("grouping-handler");
+
+ for (int i = 0; i < gaNodes.getLength(); i++)
+ {
+ Element gaNode = (Element) gaNodes.item(i);
+
+ parseGroupingHandlerConfiguration(gaNode);
+ }
+
NodeList ccNodes = e.getElementsByTagName("cluster-connection");
for (int i = 0; i < ccNodes.getLength(); i++)
@@ -319,7 +329,7 @@
logJournalWriteRate = getBoolean(e, "log-journal-write-rate",
DEFAULT_JOURNAL_LOG_WRITE_RATE);
journalPerfBlastPages = getInteger(e, "perf-blast-pages",
DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
-
+
runSyncSpeedTest = getBoolean(e, "run-sync-speed-test",
runSyncSpeedTest);
wildcardRoutingEnabled = getBoolean(e, "wild-card-routing-enabled",
wildcardRoutingEnabled);
@@ -568,6 +578,19 @@
clusterConfigurations.add(config);
}
+ private void parseGroupingHandlerConfiguration(final Element node)
+ {
+ String name = node.getAttribute("name");
+ String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ Integer timeout = getInteger(node, "timeout",
GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
+ groupingHandlerConfiguration = new GroupingHandlerConfiguration(new
SimpleString(name),
+
type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())?
GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address),
+ timeout);
+ }
+
+
private void parseBridgeConfiguration(final Element brNode)
{
String name = brNode.getAttribute("name");
Modified: trunk/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/management/NotificationType.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -30,11 +30,13 @@
BROADCAST_GROUP_STARTED(10),
BROADCAST_GROUP_STOPPED(11),
BRIDGE_STARTED(12),
- BRIDGE_STOPPED(13),
+ BRIDGE_STOPPED(13),
CLUSTER_CONNECTION_STARTED(14),
CLUSTER_CONNECTION_STOPPED(15),
ACCEPTOR_STARTED(16),
- ACCEPTOR_STOPPED(17);
+ ACCEPTOR_STOPPED(17),
+ PROPOSAL(18),
+ PROPOSAL_RESPONSE(19);
private final int value;
Copied: trunk/src/main/org/hornetq/core/persistence/GroupingInfo.java (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/GroupingInfo.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/persistence/GroupingInfo.java 2009-10-28 18:32:19 UTC
(rev 8158)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.persistence;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 18, 2009
+ */
+public interface GroupingInfo
+{
+ public SimpleString getClusterName();
+
+ public SimpleString getGroupId();
+
+ public long getId();
+}
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -28,6 +28,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -45,19 +46,19 @@
public interface StorageManager extends HornetQComponent
{
// Message related operations
-
+
void pageClosed(SimpleString storeName, int pageNumber);
-
+
void pageDeleted(SimpleString storeName, int pageNumber);
-
+
void pageWrite(PagedMessage message, int pageNumber);
-
+
boolean isReplicated();
-
+
void afterReplicated(Runnable run);
-
+
void completeReplication();
-
+
UUID getPersistentID();
void setPersistentID(UUID id) throws Exception;
@@ -126,13 +127,19 @@
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
void deleteHeuristicCompletion(long id) throws Exception;
-
+
// Bindings related operations
void addQueueBinding(Binding binding) throws Exception;
void deleteQueueBinding(long queueBindingID) throws Exception;
- void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos) throws
Exception;
+ void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception;
+
+ //grouping relateed operations
+ void addGrouping(GroupBinding groupBinding) throws Exception;
+
+
+ void deleteGrouping(GroupBinding groupBinding) throws Exception;
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -54,6 +54,7 @@
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
@@ -65,6 +66,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -92,6 +94,8 @@
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
+ //grouping journal record type
+ public static final byte GROUP_RECORD = 41;
// Bindings journal record type
public static final byte QUEUE_BINDING_RECORD = 21;
@@ -185,13 +189,13 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
Journal localBindings = new JournalImpl(1024 * 1024,
- 2,
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- bindingsFF,
- "hornetq-bindings",
- "bindings",
- 1);
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsFF,
+ "hornetq-bindings",
+ "bindings",
+ 1);
if (replicator != null)
{
@@ -249,17 +253,17 @@
}
else
{
- this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
+ this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
- config.getJournalMinFiles(),
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- journalFF,
- "hornetq-data",
- "hq",
- config.getJournalMaxAIO());
+ config.getJournalMinFiles(),
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ journalFF,
+ "hornetq-data",
+ "hq",
+ config.getJournalMaxAIO());
if (replicator != null)
{
@@ -295,8 +299,8 @@
// TODO: shouldn't those page methods be on the PageManager?
- /*
- *
+ /*
+ *
* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString,
int)
*/
@@ -399,7 +403,7 @@
HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
largeMessage.decodeProperties(headerBuffer);
-
+
largeMessage.setMessageID(id);
return largeMessage;
@@ -446,8 +450,8 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
-
.getID());
+ ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
+
ref.getQueue().getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
@@ -523,12 +527,12 @@
messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true);
return id;
}
-
+
public void deleteHeuristicCompletion(long id) throws Exception
{
messageJournal.appendDeleteRecord(id, true);
}
-
+
public void deletePageTransactional(final long txID, final long recordID) throws
Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -536,8 +540,8 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final
MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
-
.getID());
+ ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
+
ref.getQueue().getID());
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
@@ -604,7 +608,7 @@
}
/**
* @param journalLargeServerMessage
- * @throws Exception
+ * @throws Exception
*/
public void completeLargeMessage(JournalLargeServerMessage message) throws Exception
{
@@ -629,11 +633,11 @@
int deliveryCount;
}
-
+
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
private final Map<Long, ServerMessage> messages;
-
+
public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
{
super();
@@ -662,7 +666,7 @@
}
}
}
-
+
}
public void loadMessageJournal(final PostOffice postOffice,
@@ -676,9 +680,9 @@
List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
-
+
messageJournal.load(records, preparedTransactions, new
LargeMessageTXFailureCallback(messages));
-
+
ArrayList<LargeServerMessage> largeMessages = new
ArrayList<LargeServerMessage>();
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long,
Map<Long, AddMessageRecord>>();
@@ -901,7 +905,7 @@
msg.decrementRefCount();
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
@@ -921,14 +925,14 @@
LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
messageEncoding.decode(buff);
-
+
Long originalMessageID =
(Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
+
// Using the linked file by the original file
if (originalMessageID != null)
{
LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
+
if (originalMessage == null)
{
// this could happen if the message was deleted but the file still exists as
the file still being used
@@ -937,9 +941,9 @@
originalMessage.setStored();
messages.put(originalMessageID, originalMessage);
}
-
+
originalMessage.incrementRefCount();
-
+
largeMessage.setLinkedMessage(originalMessage);
}
return largeMessage;
@@ -982,7 +986,7 @@
case ADD_LARGE_MESSAGE:
{
messages.put(record.id, parseLargeMessage(messages, buff));
-
+
break;
}
case ADD_MESSAGE:
@@ -1138,7 +1142,20 @@
resourceManager.putTransaction(xid, tx);
}
}
+ //grouping handler operations
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(), groupBinding.getClusterName());
+ System.out.println("groupingEncoding = " + groupingEncoding);
+ bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD,
groupingEncoding, true);
+ }
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ System.out.println("deleting groupBinding = " + groupBinding);
+ bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ }
+
// Bindings operations
public void addQueueBinding(final Binding binding) throws Exception
@@ -1161,7 +1178,7 @@
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos)
throws Exception
+ public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1185,7 +1202,7 @@
bindingEncoding.setId(id);
- queueBindingInfos.add(bindingEncoding);
+ queueBindingInfos.add(bindingEncoding);
}
else if (rec == PERSISTENT_ID_RECORD)
{
@@ -1199,6 +1216,13 @@
{
idGenerator.loadState(record.id, buffer);
}
+ else if(rec == GROUP_RECORD)
+ {
+ GroupingEncoding encoding = new GroupingEncoding();
+ encoding.decode(buffer);
+ encoding.setId(id);
+ groupingInfos.add(encoding);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
@@ -1241,7 +1265,7 @@
// Must call close to make sure last id is persisted
if (idGenerator != null)
{
- idGenerator.close();
+ idGenerator.close();
}
bindingsJournal.stop();
@@ -1327,7 +1351,7 @@
if (executor == null)
{
deleteAction.run();
- }
+ }
else
{
executor.execute(deleteAction);
@@ -1420,13 +1444,13 @@
return XidCodecSupport.getXidEncodeLength(xid);
}
}
-
+
private static class HeuristicCompletionEncoding implements EncodingSupport
{
Xid xid;
boolean isCommit;
-
+
HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
{
this.xid = xid;
@@ -1455,6 +1479,69 @@
}
}
+ private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+ {
+ long id;
+
+ SimpleString groupId;
+
+ SimpleString clusterName;
+
+ public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupingEncoding()
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return SimpleString.sizeofString(groupId) +
SimpleString.sizeofString(clusterName);
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(groupId);
+ buffer.writeSimpleString(clusterName);
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ groupId = buffer.readSimpleString();
+ clusterName = buffer.readSimpleString();
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return id + ":" + groupId + ":" + clusterName;
+ }
+ }
+
private static class PersistentQueueBindingEncoding implements EncodingSupport,
QueueBindingInfo
{
long id;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -33,6 +34,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -78,7 +80,7 @@
{
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos)
throws Exception
+ public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception
{
}
@@ -185,7 +187,7 @@
{
return new NullStorageLargeServerMessage();
}
-
+
public LargeServerMessage createLargeMessage(long id, byte[] header)
{
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
@@ -193,13 +195,13 @@
HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
largeMessage.decodeProperties(headerBuffer);
-
+
largeMessage.setMessageID(id);
-
- return largeMessage;
+
+ return largeMessage;
}
-
-
+
+
public long generateUniqueID()
{
long id = idSequence.getAndIncrement();
@@ -317,5 +319,14 @@
{
}
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
}
Copied: trunk/src/main/org/hornetq/core/postoffice/BindingsFactory.java (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java)
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/BindingsFactory.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/postoffice/BindingsFactory.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.postoffice;
+
+/**
+ * A factory for creating bindings
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface BindingsFactory
+{
+ Bindings createBindings();
+}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -15,8 +15,10 @@
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -26,19 +28,25 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
/**
* A BindingsImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 11 Dec 2008 08:34:33
*
+ * Created 11 Dec 2008 08:34:33
*
+ *
*/
public class BindingsImpl implements Bindings
{
@@ -54,6 +62,13 @@
private volatile boolean routeWhenNoConsumers;
+ private final GroupingHandler groupingHandler;
+
+ public BindingsImpl(GroupingHandler groupingHandler)
+ {
+ this.groupingHandler = groupingHandler;
+ }
+
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
{
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -119,87 +134,87 @@
bindingsMap.remove(binding.getID());
}
-
+
public void redistribute(final ServerMessage message, final Queue originatingQueue,
final RoutingContext context) throws Exception
- {
- if (routeWhenNoConsumers)
{
- return;
- }
+ if (routeWhenNoConsumers)
+ {
+ return;
+ }
- SimpleString routingName = originatingQueue.getName();
+ SimpleString routingName = originatingQueue.getName();
- List<Binding> bindings = routingNameBindingMap.get(routingName);
+ List<Binding> bindings = routingNameBindingMap.get(routingName);
- if (bindings == null)
- {
- // The value can become null if it's concurrently removed while we're
iterating - this is expected
- // ConcurrentHashMap behaviour!
- return;
- }
+ if (bindings == null)
+ {
+ // The value can become null if it's concurrently removed while we're
iterating - this is expected
+ // ConcurrentHashMap behaviour!
+ return;
+ }
- Integer ipos = routingNamePositions.get(routingName);
+ Integer ipos = routingNamePositions.get(routingName);
- int pos = ipos != null ? ipos.intValue() : 0;
+ int pos = ipos != null ? ipos.intValue() : 0;
- int length = bindings.size();
+ int length = bindings.size();
- int startPos = pos;
+ int startPos = pos;
- Binding theBinding = null;
+ Binding theBinding = null;
- // TODO - combine this with similar logic in route()
- while (true)
- {
- Binding binding;
- try
+ // TODO - combine this with similar logic in route()
+ while (true)
{
- binding = bindings.get(pos);
- }
- catch (IndexOutOfBoundsException e)
- {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
+ Binding binding;
+ try
{
- pos = 0;
- startPos = 0;
- length = bindings.size();
-
- continue;
+ binding = bindings.get(pos);
}
- else
+ catch (IndexOutOfBoundsException e)
{
- break;
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+ startPos = 0;
+ length = bindings.size();
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
}
- }
- pos = incrementPos(pos, length);
+ pos = incrementPos(pos, length);
- Filter filter = binding.getFilter();
+ Filter filter = binding.getFilter();
- boolean highPrior = binding.isHighAcceptPriority(message);
+ boolean highPrior = binding.isHighAcceptPriority(message);
- if (highPrior && binding.getBindable() != originatingQueue &&
(filter == null || filter.match(message)))
- {
- theBinding = binding;
+ if (highPrior && binding.getBindable() != originatingQueue &&
(filter == null || filter.match(message)))
+ {
+ theBinding = binding;
- break;
+ break;
+ }
+
+ if (pos == startPos)
+ {
+ break;
+ }
}
- if (pos == startPos)
+ routingNamePositions.put(routingName, pos);
+
+ if (theBinding != null)
{
- break;
+ theBinding.route(message, context);
}
}
- routingNamePositions.put(routingName, pos);
-
- if (theBinding != null)
- {
- theBinding.route(message, context);
- }
- }
-
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
boolean routed = false;
@@ -223,6 +238,10 @@
{
routeFromCluster(message, context);
}
+ else if (groupingHandler != null &&
message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
+ {
+ routeUsingStrictOrdering(message, context, groupingHandler);
+ }
else
{
for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
@@ -238,111 +257,200 @@
continue;
}
- Integer ipos = routingNamePositions.get(routingName);
+ Binding theBinding = getNextBinding(message, routingName, bindings);
- int pos = ipos != null ? ipos.intValue() : 0;
- int length = bindings.size();
+ if (theBinding != null)
+ {
+ theBinding.route(message, context);
+ }
+ }
+ }
+ }
+ }
- int startPos = pos;
+ private Binding getNextBinding(ServerMessage message, SimpleString routingName,
List<Binding> bindings)
+ {
+ Integer ipos = routingNamePositions.get(routingName);
- Binding theBinding = null;
+ int pos = ipos != null ? ipos : 0;
- int lastLowPriorityBinding = -1;
+ int length = bindings.size();
- while (true)
+ int startPos = pos;
+
+ Binding theBinding = null;
+
+ int lastLowPriorityBinding = -1;
+
+ while (true)
+ {
+ Binding binding;
+ try
+ {
+ binding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+ startPos = 0;
+ length = bindings.size();
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ Filter filter = binding.getFilter();
+
+ if (filter == null || filter.match(message))
+ {
+ // bindings.length == 1 ==> only a local queue so we don't check for
matching consumers (it's an
+ // unnecessary overhead)
+ if (length == 1 || routeWhenNoConsumers ||
binding.isHighAcceptPriority(message))
+ {
+ theBinding = binding;
+
+ pos = incrementPos(pos, length);
+
+ break;
+ }
+ else
+ {
+ if (lastLowPriorityBinding == -1)
{
- Binding binding;
- try
+ lastLowPriorityBinding = pos;
+ }
+ }
+ }
+
+ pos = incrementPos(pos, length);
+
+ if (pos == startPos)
+ {
+ if (lastLowPriorityBinding != -1)
+ {
+ try
+ {
+ theBinding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
{
- binding = bindings.get(pos);
+ pos = 0;
+
+ lastLowPriorityBinding = -1;
+
+ continue;
}
- catch (IndexOutOfBoundsException e)
+ else
{
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
- {
- pos = 0;
- startPos = 0;
- length = bindings.size();
-
- continue;
- }
- else
- {
- break;
- }
+ break;
}
+ }
- Filter filter = binding.getFilter();
+ pos = lastLowPriorityBinding;
- if (filter == null || filter.match(message))
- {
- // bindings.length == 1 ==> only a local queue so we don't
check for matching consumers (it's an
- // unnecessary overhead)
- if (length == 1 || routeWhenNoConsumers ||
binding.isHighAcceptPriority(message))
- {
- theBinding = binding;
+ pos = incrementPos(pos, length);
+ }
+ break;
+ }
+ }
+ routingNamePositions.put(routingName, pos);
+ return theBinding;
+ }
- pos = incrementPos(pos, length);
+ private void routeUsingStrictOrdering(ServerMessage message, RoutingContext context,
GroupingHandler groupingGroupingHandler)
+ throws Exception
+ {
+ SimpleString groupId = (SimpleString)
message.getProperty(MessageImpl.HDR_GROUP_ID);
- break;
- }
- else
- {
- if (lastLowPriorityBinding == -1)
- {
- lastLowPriorityBinding = pos;
- }
- }
- }
+ for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
- pos = incrementPos(pos, length);
+ List<Binding> bindings = entry.getValue();
- if (pos == startPos)
- {
- if (lastLowPriorityBinding != -1)
- {
- try
- {
- theBinding = bindings.get(pos);
- }
- catch (IndexOutOfBoundsException e)
- {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
- {
- pos = 0;
+ if (bindings == null)
+ {
+ // The value can become null if it's concurrently removed while we're
iterating - this is expected
+ // ConcurrentHashMap behaviour!
+ continue;
+ }
- lastLowPriorityBinding = -1;
+ //concat a full group id, this is for when a binding has multiple bindings
+ SimpleString fullID = groupId.concat(".").concat(routingName);
- continue;
- }
- else
- {
- break;
- }
- }
+ //see if there is already a response
+ Response resp = groupingGroupingHandler.getProposal(fullID);
- pos = lastLowPriorityBinding;
+ if (resp == null)
+ {
+ //ok lets find the next binding to propose
+ Binding theBinding = getNextBinding(message, routingName, bindings);
+ //TODO
https://jira.jboss.org/jira/browse/HORNETQ-191
+ resp = groupingGroupingHandler.propose(new Proposal(fullID,
theBinding.getClusterName()));
- pos = incrementPos(pos, length);
- }
+ //if our proposal was declined find the correct binding to use
+ if (resp.getAlternativeClusterName() != null)
+ {
+ theBinding = null;
+ for (Binding binding : bindings)
+ {
+ if (binding.getClusterName().equals(resp.getAlternativeClusterName()))
+ {
+ theBinding = binding;
break;
}
}
+ }
- if (theBinding != null)
+ //and lets route it
+ if (theBinding != null)
+ {
+ theBinding.route(message, context);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST,
"queue " + resp.getChosenClusterName() + " has been removed cannot deliver
message, queues should not be removed when grouping is used");
+ }
+ }
+ else
+ {
+ //ok, we need to find the binding and route it
+ Binding chosen = null;
+ for (Binding binding : bindings)
+ {
+ if (binding.getClusterName().equals(resp.getChosenClusterName()))
{
- theBinding.route(message, context);
+ chosen = binding;
+ break;
}
-
- routingNamePositions.put(routingName, pos);
}
+ if (chosen != null)
+ {
+ chosen.route(message, context);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST,
"queue " + resp.getChosenClusterName() + " has been removed cannot deliver
message, queues should not be removed when grouping is used");
+ }
}
+
+
}
+
+
}
-
+
private void routeFromCluster(final ServerMessage message, final RoutingContext
context) throws Exception
{
byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-28
16:13:10 UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -46,7 +46,9 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.RoutingContext;
@@ -72,7 +74,7 @@
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
* @author <a href="csuconic(a)redhat.com">Clebert Suconic</a>
*/
-public class PostOfficeImpl implements PostOffice, NotificationListener
+public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
@@ -112,7 +114,10 @@
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
- public PostOfficeImpl(final StorageManager storageManager,
+ private final HornetQServer server;
+
+ public PostOfficeImpl(final HornetQServer server,
+ final StorageManager storageManager,
final PagingManager pagingManager,
final QueueFactory bindableFactory,
final ManagementService managementService,
@@ -122,7 +127,7 @@
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
- HierarchicalRepository<AddressSettings>
addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
{
this.storageManager = storageManager;
@@ -139,11 +144,11 @@
if (enableWildCardRouting)
{
- addressManager = new WildcardAddressManager();
+ addressManager = new WildcardAddressManager(this);
}
else
{
- addressManager = new SimpleAddressManager();
+ addressManager = new SimpleAddressManager(this);
}
this.idCacheSize = idCacheSize;
@@ -153,6 +158,8 @@
this.redistributorExecutorFactory = orderedExecutorFactory;
this.addressSettingsRepository = addressSettingsRepository;
+
+ this.server = server;
}
// HornetQComponent implementation ---------------------------------------
@@ -427,7 +434,7 @@
public synchronized void addBinding(final Binding binding) throws Exception
{
addressManager.addBinding(binding);
-
+
TypedProperties props = new TypedProperties();
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
binding.getType().toInt());
@@ -503,7 +510,7 @@
if (bindings == null)
{
- bindings = new BindingsImpl();
+ bindings = createBindings();
}
return bindings;
@@ -518,26 +525,26 @@
{
return addressManager.getMatchingBindings(address);
}
-
+
public void route(final ServerMessage message) throws Exception
- {
+ {
route(message, new RoutingContextImpl(null));
}
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
- {
+ {
SimpleString address = message.getDestination();
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
-
+
byte[] duplicateIDBytes = null;
if (duplicateID != null)
{
cache = getDuplicateIDCache(message.getDestination());
-
+
if (duplicateID instanceof SimpleString)
{
duplicateIDBytes = ((SimpleString)duplicateID).getData();
@@ -551,11 +558,11 @@
{
if (context.getTransaction() == null)
{
- log.trace("Duplicate message detected - message will not be
routed");
+ log.trace("Duplicate message detected - message will not be
routed");
}
else
{
- log.trace("Duplicate message detected - transaction will be
rejected");
+ log.trace("Duplicate message detected - transaction will be
rejected");
context.getTransaction().markAsRollbackOnly(null);
}
@@ -573,7 +580,7 @@
// We need to store the duplicate id atomically with the message storage, so
we need to create a tx for this
Transaction tx = new TransactionImpl(storageManager);
-
+
context.setTransaction(tx);
startedTx = true;
@@ -587,7 +594,7 @@
if (pagingManager.page(message, true))
{
message.setStored();
-
+
return;
}
}
@@ -606,63 +613,62 @@
}
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
+
if (bindings != null)
{
context.incrementDepth();
-
+
bindings.route(message, context);
-
+
context.decrementDepth();
}
//The depth allows for recursion e.g. with diverts - we only want to process the
route after any recursed routes
//have been processed
-
+
if (context.getDepth() == 0)
{
if (context.getQueues().isEmpty())
{
// Send to DLA if appropriate
-
- AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
-
- boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
-
- if (sendToDLA)
+
+ AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
+
+ boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+
+ if (sendToDLA)
+ {
+ //Send to the DLA for the address
+
+ SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (dlaAddress == null)
{
- // Send to the DLA for the address
-
- SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
-
- if (dlaAddress == null)
- {
- log.warn("Did not route to any bindings for address " +
address +
- " and sendToDLAOnNoRoute is true " +
- "but there is no DLA configured for the address, the
message will be ignored.");
- }
- else
- {
- message.setOriginalHeaders(message, false);
-
- message.setDestination(dlaAddress);
-
+ log.warn("Did not route to any bindings for address " + address
+ " and sendToDLAOnNoRoute is true " +
+ "but there is no DLA configured for the address, the message
will be ignored.");
+ }
+ else
+ {
+ message.setOriginalHeaders(message, false);
+
+ message.setDestination(dlaAddress);
+
route(message, context);
- }
}
}
+ }
else
{
processRoute(message, context);
}
-
- if (startedTx)
- {
+
+ if (startedTx)
+ {
context.getTransaction().commit();
- }
}
}
-
+ }
+
public MessageReference reroute(final ServerMessage message, final Queue queue, final
Transaction tx) throws Exception
{
MessageReference reference = message.createReference(queue);
@@ -672,18 +678,18 @@
if (scheduledDeliveryTime != null)
{
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
+ }
message.incrementDurableRefCount();
message.setStored();
-
+
int refCount = message.incrementRefCount();
PagingStore store = pagingManager.getPageStore(message.getDestination());
if (refCount == 1)
- {
+ {
store.addSize(message.getMemoryEstimate());
}
@@ -696,33 +702,33 @@
else
{
List<MessageReference> refs = new ArrayList<MessageReference>(1);
-
+
refs.add(reference);
-
+
tx.addOperation(new AddOperation(refs));
}
-
+
return reference;
}
-
+
public boolean redistribute(final ServerMessage message, final Queue originatingQueue,
final RoutingContext context) throws Exception
- {
+ {
Bindings bindings =
addressManager.getBindingsForRoutingAddress(message.getDestination());
boolean res = false;
-
+
if (bindings != null)
{
bindings.redistribute(message, originatingQueue, context);
-
+
if (!context.getQueues().isEmpty())
{
processRoute(message, context);
-
+
res = true;
- }
+ }
}
-
+
return res;
}
@@ -847,11 +853,11 @@
}
private void processRoute(final ServerMessage message, final RoutingContext context)
throws Exception
- {
+ {
final List<MessageReference> refs = new ArrayList<MessageReference>();
Transaction tx = context.getTransaction();
-
+
for (Queue queue : context.getQueues())
{
MessageReference reference = message.createReference(queue);
@@ -863,7 +869,7 @@
if (scheduledDeliveryTime != null)
{
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
+ }
if (message.isDurable() && queue.isDurable())
{
@@ -874,7 +880,7 @@
if (tx != null)
{
storageManager.storeMessageTransactional(tx.getID(), message);
- }
+ }
else
{
storageManager.storeMessage(message);
@@ -884,22 +890,22 @@
}
if (tx != null)
- {
+ {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(),
message.getMessageID());
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
else
- {
+ {
storageManager.storeReference(queue.getID(), message.getMessageID());
- }
+ }
if (scheduledDeliveryTime != null)
{
if (tx != null)
{
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(),
reference);
- }
+ }
else
{
storageManager.updateScheduledDeliveryTime(reference);
@@ -918,7 +924,7 @@
store.addSize(reference.getMemoryEstimate());
}
-
+
if (tx != null)
{
tx.addOperation(new AddOperation(refs));
@@ -948,11 +954,11 @@
private void addReferences(final List<MessageReference> refs)
{
for (MessageReference ref : refs)
- {
+ {
ref.getQueue().addLast(ref);
}
}
-
+
private synchronized void startExpiryScanner()
{
if (reaperPeriod > 0)
@@ -1203,7 +1209,7 @@
}
}
}
-
+
private class AddOperation implements TransactionOperation
{
private final List<MessageReference> refs;
@@ -1211,14 +1217,14 @@
AddOperation(final List<MessageReference> refs)
{
this.refs = refs;
- }
+ }
public void afterCommit(Transaction tx)
- {
+ {
for (MessageReference ref : refs)
{
ref.getQueue().addLast(ref);
- }
+}
}
public void afterPrepare(Transaction tx) throws Exception
@@ -1263,4 +1269,9 @@
}
}
}
+
+ public Bindings createBindings()
+ {
+ return new BindingsImpl(server.getGroupingHandler());
+ }
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-10-28
16:13:10 UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -21,6 +21,7 @@
import org.hornetq.core.postoffice.AddressManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -38,6 +39,13 @@
private final ConcurrentMap<SimpleString, Binding> nameMap = new
ConcurrentHashMap<SimpleString, Binding>();
+ private final BindingsFactory bindingsFactory;
+
+ public SimpleAddressManager(BindingsFactory bindingsFactory)
+ {
+ this.bindingsFactory = bindingsFactory;
+ }
+
public boolean addBinding(final Binding binding)
{
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
@@ -81,7 +89,7 @@
{
Address add = new AddressImpl(address);
- Bindings bindings = new BindingsImpl();
+ Bindings bindings = bindingsFactory.createBindings();
for (Binding binding: nameMap.values())
{
@@ -152,7 +160,7 @@
if (bindings == null)
{
- bindings = new BindingsImpl();
+ bindings = bindingsFactory.createBindings();
prevBindings = mappings.putIfAbsent(address, bindings);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-10-28
16:13:10 UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -21,6 +21,7 @@
import org.hornetq.core.postoffice.Address;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -50,6 +51,11 @@
private final Map<SimpleString, Address> wildCardAddresses = new
HashMap<SimpleString, Address>();
+ public WildcardAddressManager(BindingsFactory bindingsFactory)
+ {
+ super(bindingsFactory);
+ }
+
public Bindings getBindingsForRoutingAddress(final SimpleString address)
{
Bindings bindings = super.getBindingsForRoutingAddress(address);
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-28 16:13:10 UTC (rev
8157)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-28 18:32:19 UTC (rev
8158)
@@ -33,6 +33,7 @@
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -71,7 +72,7 @@
void unregisterActivateCallback(ActivateCallback callback);
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String
name, int lastReceivedCommandID) throws Exception;
-
+
ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
CreateSessionResponseMessage createSession(String name,
@@ -130,4 +131,8 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
ExecutorFactory getExecutorFactory();
+
+ void setGroupingHandler(GroupingHandler groupingHandler);
+
+ GroupingHandler getGroupingHandler();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-28
16:13:10 UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -559,26 +559,30 @@
SimpleString notifQueueName = new SimpleString(qName);
SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE +
"<>" +
- BindingType.DIVERT.toInt() +
- " AND " +
- ManagementHelper.HDR_NOTIFICATION_TYPE +
- " IN ('" +
- NotificationType.BINDING_ADDED +
- "','" +
- NotificationType.BINDING_REMOVED +
- "','" +
- NotificationType.CONSUMER_CREATED +
- "','" +
- NotificationType.CONSUMER_CLOSED +
- "') AND " +
- ManagementHelper.HDR_DISTANCE +
- "<" +
- flowRecord.getMaxHops() +
- " AND (" +
- ManagementHelper.HDR_ADDRESS +
- " LIKE '" +
- flowRecord.getAddress() +
- "%')");
+ BindingType.DIVERT.toInt() +
+ " AND " +
+ ManagementHelper.HDR_NOTIFICATION_TYPE
+
+ " IN ('" +
+ NotificationType.BINDING_ADDED +
+ "','" +
+ NotificationType.BINDING_REMOVED +
+ "','" +
+ NotificationType.CONSUMER_CREATED +
+ "','" +
+ NotificationType.CONSUMER_CLOSED +
+ "','" +
+ NotificationType.PROPOSAL +
+ "','" +
+ NotificationType.PROPOSAL_RESPONSE +
+ "') AND " +
+ ManagementHelper.HDR_DISTANCE +
+ "<" +
+ flowRecord.getMaxHops() +
+ " AND (" +
+ ManagementHelper.HDR_ADDRESS +
+ " LIKE '" +
+ flowRecord.getAddress() +
+ "%')");
// The queue can't be temporary, since if the node with the bridge crashes
then is restarted quickly
// it might get deleted on the target when it does connection cleanup
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -40,6 +40,8 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.MessageFlowRecord;
@@ -541,7 +543,12 @@
}
case SECURITY_AUTHENTICATION_VIOLATION:
case SECURITY_PERMISSION_VIOLATION:
+ case PROPOSAL:
+ doProposalReceived(message);
break;
+ case PROPOSAL_RESPONSE:
+ doProposalResponseReceived(message);
+ break;
default:
{
throw new IllegalArgumentException("Invalid type " + ntype);
@@ -554,6 +561,49 @@
}
}
+ /*
+ * Inform the grouping handler of a proposal
+ * */
+ private synchronized void doProposalReceived(final ClientMessage message) throws
Exception
+ {
+ SimpleString type = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
+
+ if (type == null)
+ {
+ throw new IllegalStateException("proposal type is null");
+ }
+
+ SimpleString val = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+
+ Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+
+ Response response = server.getGroupingHandler().receive(new Proposal(type, val),
hops + 1);
+
+ if(response != null)
+ {
+ server.getGroupingHandler().send(response, 0);
+ }
+ }
+
+ /*
+ * Inform the grouping handler of a response from a proposal
+ *
+ * */
+ private synchronized void doProposalResponseReceived(final ClientMessage message)
throws Exception
+ {
+ SimpleString type = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
+ if (type == null)
+ {
+ throw new IllegalStateException("proposal type is null");
+ }
+ SimpleString val = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+ SimpleString alt = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
+ Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+ Response response = new Response(type, val, alt);
+ server.getGroupingHandler().proposed(response);
+ server.getGroupingHandler().send(response, hops + 1);
+ }
+
private synchronized void clearBindings() throws Exception
{
for (RemoteQueueBinding binding : new
HashSet<RemoteQueueBinding>(bindings.values()))
@@ -779,4 +829,9 @@
theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
}
+ //for testing only
+ public Map<String, MessageFlowRecord> getRecords()
+ {
+ return records;
+ }
}
Copied: trunk/src/main/org/hornetq/core/server/group (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group)
Deleted: trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group;
-
-import org.hornetq.utils.SimpleString;
-import org.hornetq.core.server.group.impl.Proposal;
-import org.hornetq.core.server.group.impl.Response;
-import org.hornetq.core.server.group.impl.GroupBinding;
-import org.hornetq.core.management.NotificationListener;
-import org.hornetq.core.management.Notification;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public interface GroupingHandler extends NotificationListener
-{
- SimpleString getName();
-
- Response propose(Proposal proposal) throws Exception;
-
- void proposed(Response response) throws Exception;
-
- void send(Response response, int distance) throws Exception;
-
- Response receive(Proposal proposal, int distance) throws Exception;
-
- void addGroupBinding(GroupBinding groupBinding);
-}
Copied: trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.GroupBinding;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.Notification;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface GroupingHandler extends NotificationListener
+{
+ SimpleString getName();
+
+ Response propose(Proposal proposal) throws Exception;
+
+ void proposed(Response response) throws Exception;
+
+ void send(Response response, int distance) throws Exception;
+
+ Response receive(Proposal proposal, int distance) throws Exception;
+
+ void addGroupBinding(GroupBinding groupBinding);
+
+ Response getProposal(SimpleString fullID);
+}
Copied: trunk/src/main/org/hornetq/core/server/group/impl (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl)
Deleted: trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Oct 19, 2009
- */
-public class GroupBinding
-{
- private long id;
-
- private final SimpleString groupId;
-
- private final SimpleString clusterName;
-
- public GroupBinding(SimpleString groupId, SimpleString clusterName)
- {
- this.groupId = groupId;
- this.clusterName = clusterName;
- }
-
- public GroupBinding(long id, SimpleString groupId, SimpleString clusterName)
- {
- this.id = id;
- this.groupId = groupId;
- this.clusterName = clusterName;
- }
-
- public long getId()
- {
- return id;
- }
-
- public void setId(long id)
- {
- this.id = id;
- }
-
- public SimpleString getGroupId()
- {
- return groupId;
- }
-
- public SimpleString getClusterName()
- {
- return clusterName;
- }
-}
Copied: trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java (from rev
8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/server/group/impl/GroupBinding.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A group binding
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 19, 2009
+ */
+public class GroupBinding
+{
+ private long id;
+
+ private final SimpleString groupId;
+
+ private final SimpleString clusterName;
+
+ public GroupBinding(SimpleString groupId, SimpleString clusterName)
+ {
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupBinding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return id + ":" + groupId + ":" + clusterName;
+ }
+}
Deleted:
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java 2009-10-22
17:42:57 UTC (rev 8139)
+++
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class GroupingHandlerConfiguration
-{
- private final SimpleString name;
-
- private final TYPE type;
-
- private final SimpleString address;
-
- private final int timeout;
-
- public static final int DEFAULT_TIMEOUT = 5000;
-
- public GroupingHandlerConfiguration(final SimpleString name, final TYPE type,
SimpleString address)
- {
- this(name, type, address, DEFAULT_TIMEOUT);
- }
-
- public GroupingHandlerConfiguration(final SimpleString name, final TYPE type,
SimpleString address, int timeout)
- {
- this.type = type;
- this.name = name;
- this.address = address;
- this.timeout = timeout;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public TYPE getType()
- {
- return type;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public int getTimeout()
- {
- return timeout;
- }
-
- public enum TYPE
- {
- LOCAL("LOCAL"),
- REMOTE("REMOTE");
-
- private String type;
-
- TYPE(String type)
- {
- this.type = type;
- }
-
- public String getType()
- {
- return type;
- }
- }
-}
Copied:
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (from
rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A remote Grouping handler configuration
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class GroupingHandlerConfiguration
+{
+ private final SimpleString name;
+
+ private final TYPE type;
+
+ private final SimpleString address;
+
+ private final int timeout;
+
+ public static final int DEFAULT_TIMEOUT = 5000;
+
+ public GroupingHandlerConfiguration(final SimpleString name, final TYPE type,
SimpleString address)
+ {
+ this(name, type, address, DEFAULT_TIMEOUT);
+ }
+
+ public GroupingHandlerConfiguration(final SimpleString name, final TYPE type,
SimpleString address, int timeout)
+ {
+ this.type = type;
+ this.name = name;
+ this.address = address;
+ this.timeout = timeout;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public TYPE getType()
+ {
+ return type;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public int getTimeout()
+ {
+ return timeout;
+ }
+
+ public enum TYPE
+ {
+ LOCAL("LOCAL"),
+ REMOTE("REMOTE");
+
+ private String type;
+
+ TYPE(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+}
Deleted: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,133 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.group.GroupingHandler;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.HashMap;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class LocalGroupingHandler implements GroupingHandler
-{
- private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
-
- private ConcurrentHashMap<SimpleString, GroupBinding> map = new
ConcurrentHashMap<SimpleString, GroupBinding>();
-
- private HashMap<SimpleString, GroupBinding> groupMap = new
HashMap<SimpleString, GroupBinding>();
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private SimpleString address;
- private StorageManager storageManager;
-
- public LocalGroupingHandler(final ManagementService managementService, final
SimpleString name, final SimpleString address, StorageManager storageManager)
- {
- this.managementService = managementService;
- this.name = name;
- this.address = address;
- this.storageManager = storageManager;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
-
- public Response propose(Proposal proposal) throws Exception
- {
- if(proposal.getClusterName() == null)
- {
- GroupBinding original = map.get(proposal.getGroupId());
- return original == null?null:new Response(proposal.getGroupId(),
original.getClusterName());
- }
- GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(),
proposal.getClusterName());
- if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
- {
- groupBinding.setId(storageManager.generateUniqueID());
- groupMap.put(groupBinding.getClusterName(), groupBinding);
- storageManager.addGrouping(groupBinding);
- return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
- }
- else
- {
- groupBinding = map.get(proposal.getGroupId());
- return new Response(groupBinding.getGroupId(), proposal.getClusterName(),
groupBinding.getClusterName());
- }
- }
-
- public void proposed(Response response) throws Exception
- {
- }
-
- public void send(Response response, int distance) throws Exception
- {
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
response.getGroupId());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
response.getClusterName());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE,
response.getAlternativeClusterName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null,
NotificationType.PROPOSAL_RESPONSE, props);
- managementService.sendNotification(notification);
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- return propose(proposal);
- }
-
- public void addGroupBinding(GroupBinding groupBinding)
- {
- map.put(groupBinding.getGroupId(), groupBinding);
- groupMap.put(groupBinding.getClusterName(), groupBinding);
- }
-
- public void onNotification(Notification notification)
- {
- if(notification.getType() == NotificationType.BINDING_REMOVED)
- {
- SimpleString clusterName = (SimpleString)
notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- GroupBinding val = groupMap.get(clusterName);
- if(val != null)
- {
- groupMap.remove(clusterName);
- map.remove(val.getGroupId());
- try
- {
- storageManager.deleteGrouping(val);
- }
- catch (Exception e)
- {
- log.warn("Unable to delete group binding info " +
val.getGroupId(), e);
- }
- }
- }
- }
-}
-
Copied: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (from
rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * A Local Grouping handler. All the Remote handlers will talk with us
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class LocalGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
+
+ private ConcurrentHashMap<SimpleString, GroupBinding> map = new
ConcurrentHashMap<SimpleString, GroupBinding>();
+
+ private ConcurrentHashMap<SimpleString, List<GroupBinding>> groupMap = new
ConcurrentHashMap<SimpleString, List<GroupBinding>>();
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private SimpleString address;
+ private StorageManager storageManager;
+ private int timeout;
+
+ public LocalGroupingHandler(final ManagementService managementService, final
SimpleString name, final SimpleString address, StorageManager storageManager, int
timeout)
+ {
+ this.managementService = managementService;
+ this.name = name;
+ this.address = address;
+ this.storageManager = storageManager;
+ this.timeout = timeout;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ log.info("proposing proposal " + proposal);
+ if (proposal.getClusterName() == null)
+ {
+ GroupBinding original = map.get(proposal.getGroupId());
+ return original == null ? null : new Response(proposal.getGroupId(),
original.getClusterName());
+ }
+ GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(),
proposal.getClusterName());
+ if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
+ {
+ groupBinding.setId(storageManager.generateUniqueID());
+ List<GroupBinding> newList = new ArrayList<GroupBinding>();
+ List<GroupBinding> oldList =
groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
+ if(oldList != null)
+ {
+ newList = oldList;
+ }
+ newList.add(groupBinding);
+ storageManager.addGrouping(groupBinding);
+ if (storageManager.isReplicated())
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ storageManager.afterReplicated(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ storageManager.completeReplication();
+ }
+ });
+ if (!latch.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new IllegalStateException("no response received from group
handler for " + proposal.getGroupId());
+ }
+ }
+ return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
+ }
+ else
+ {
+ groupBinding = map.get(proposal.getGroupId());
+ return new Response(groupBinding.getGroupId(), proposal.getClusterName(),
groupBinding.getClusterName());
+ }
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID,
response.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
response.getClusterName());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE,
response.getAlternativeClusterName());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null,
NotificationType.PROPOSAL_RESPONSE, props);
+ managementService.sendNotification(notification);
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ log.trace("received proposal " + proposal);
+ return propose(proposal);
+ }
+
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ map.put(groupBinding.getGroupId(), groupBinding);
+ List<GroupBinding> newList = new ArrayList<GroupBinding>();
+ List<GroupBinding> oldList =
groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
+ if(oldList != null)
+ {
+ newList = oldList;
+ }
+ newList.add(groupBinding);
+ }
+
+ public Response getProposal(SimpleString fullID)
+ {
+ GroupBinding original = map.get(fullID);
+ return original == null ? null : new Response(fullID, original.getClusterName());
+ }
+
+ public void onNotification(Notification notification)
+ {
+ if (notification.getType() == NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName = (SimpleString)
notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ List<GroupBinding> list = groupMap.remove(clusterName);
+ if (list != null)
+ {
+ for (GroupBinding val : list)
+ {
+ if (val != null)
+ {
+ map.remove(val.getGroupId());
+ try
+ {
+ storageManager.deleteGrouping(val);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to delete group binding info " +
val.getGroupId(), e);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
Deleted: trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class Proposal
-{
- private final SimpleString groupId;
- private final SimpleString clusterName;
-
- public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
- public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
-
- public Proposal(SimpleString groupId, SimpleString clusterName)
- {
- this.clusterName = clusterName;
- this.groupId = groupId;
- }
-
- public SimpleString getGroupId()
- {
- return groupId;
- }
-
- public SimpleString getClusterName()
- {
- return clusterName;
- }
-}
-
Copied: trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A proposal to select a group id
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class Proposal
+{
+ private final SimpleString groupId;
+
+ private final SimpleString clusterName;
+
+ public Proposal(SimpleString groupId, SimpleString clusterName)
+ {
+ this.clusterName = clusterName;
+ this.groupId = groupId;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getGroupId() + ":" + clusterName;
+ }
+}
+
Deleted: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-22
17:42:57 UTC (rev 8139)
+++
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -1,155 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.server.group.GroupingHandler;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-import java.util.logging.Logger;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class RemoteGroupingHandler implements GroupingHandler
-{
- private static Logger log = Logger.getLogger(RemoteGroupingHandler.class.getName());
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private final SimpleString address;
-
- private Map<SimpleString, Response> responses = new HashMap<SimpleString,
Response>();
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition sendCondition = lock.newCondition();
-
- private final int timeout;
-
- private HashMap<SimpleString, SimpleString> groupMap = new
HashMap<SimpleString, SimpleString>();
-
- public RemoteGroupingHandler(final ManagementService managementService, final
SimpleString name, final SimpleString address, int timeout)
- {
- this.name = name;
- this.address = address;
- this.managementService = managementService;
- this.timeout = timeout;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public Response propose(final Proposal proposal) throws Exception
- {
- Response response = responses.get(proposal.getGroupId());
- if( response != null)
- {
- return response;
- }
- if (proposal.getClusterName() == null)
- {
- return null;
- }
- try
- {
- lock.lock();
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
proposal.getGroupId());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
proposal.getClusterName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
- Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
- managementService.sendNotification(notification);
- sendCondition.await(timeout, TimeUnit.MILLISECONDS);
- response = responses.get(proposal.getGroupId());
- }
- finally
- {
- lock.unlock();
- }
- if(response == null)
- {
- throw new IllegalStateException("no response received from group handler
for " + proposal.getGroupId());
- }
- return response;
- }
-
- public void proposed(Response response) throws Exception
- {
- try
- {
- lock.lock();
- responses.put(response.getGroupId(), response);
- groupMap.put(response.getChosenClusterName(), response.getGroupId());
- sendCondition.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
proposal.getGroupId());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
proposal.getClusterName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
- managementService.sendNotification(notification);
- return null;
- }
-
- public void send(Response response, int distance) throws Exception
- {
- }
-
- public void addGroupBinding(GroupBinding groupBinding)
- {
-
- }
-
- public void onNotification(Notification notification)
- {
- if(notification.getType() == NotificationType.BINDING_REMOVED)
- {
- SimpleString clusterName = (SimpleString)
notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- SimpleString val = groupMap.get(clusterName);
- if(val != null)
- {
- groupMap.remove(clusterName);
- responses.remove(val);
- }
- }
- }
-}
-
Copied: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (from
rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A remote Grouping handler. This will use management notifications to communicate with
the node that has the Local
+ * Grouping handler to make proposals.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class RemoteGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(RemoteGroupingHandler.class.getName());
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private final SimpleString address;
+
+ private Map<SimpleString, Response> responses = new HashMap<SimpleString,
Response>();
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private final int timeout;
+
+ private ConcurrentHashMap<SimpleString, List<SimpleString>> groupMap = new
ConcurrentHashMap<SimpleString, List<SimpleString>>();
+
+ public RemoteGroupingHandler(final ManagementService managementService, final
SimpleString name, final SimpleString address, int timeout)
+ {
+ this.name = name;
+ this.address = address;
+ this.managementService = managementService;
+ this.timeout = timeout;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public Response propose(final Proposal proposal) throws Exception
+ {
+ //sanity check in case it is already selected
+ Response response = responses.get(proposal.getGroupId());
+ if( response != null)
+ {
+ return response;
+ }
+
+ try
+ {
+ lock.lock();
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID,
proposal.getGroupId());
+
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
proposal.getClusterName());
+
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+
+ Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
+
+ log.info("sending proposal " + proposal);
+
+ managementService.sendNotification(notification);
+
+ sendCondition.await(timeout, TimeUnit.MILLISECONDS);
+
+ response = responses.get(proposal.getGroupId());
+
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ if(response == null)
+ {
+ throw new IllegalStateException("no response received from group handler
for " + proposal.getGroupId());
+ }
+ return response;
+ }
+
+ public Response getProposal(SimpleString fullID)
+ {
+ return responses.get(fullID);
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ try
+ {
+ lock.lock();
+ responses.put(response.getGroupId(), response);
+ List<SimpleString> newList = new ArrayList<SimpleString>();
+ List<SimpleString> oldList =
groupMap.putIfAbsent(response.getChosenClusterName(), newList);
+ if(oldList != null)
+ {
+ newList = oldList;
+ }
+ newList.add(response.getGroupId());
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID,
proposal.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
proposal.getClusterName());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
+ managementService.sendNotification(notification);
+ return null;
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ //NO-OP
+ }
+
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ //NO-OP
+ }
+
+ public void onNotification(Notification notification)
+ {
+ //removing the groupid if the binding has been removed
+ if(notification.getType() == NotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName = (SimpleString)
notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ groupMap.remove(clusterName);
+ List<SimpleString> list = groupMap.remove(clusterName);
+ if (list != null)
+ {
+ for (SimpleString val : list)
+ {
+ if(val != null)
+ {
+ responses.remove(val);
+ }
+ }
+ }
+
+ }
+ }
+}
+
Deleted: trunk/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-22
17:42:57 UTC (rev 8139)
+++ trunk/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -1,73 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class Response
-{
- private final boolean accepted;
-
- private final SimpleString clusterName;
-
- private final SimpleString alternativeClusterName;
-
- private SimpleString groupId;
-
- public Response(SimpleString groupId, SimpleString clusterName)
- {
- this(groupId, clusterName, null);
- }
-
- public Response(SimpleString groupId, SimpleString clusterName, SimpleString
alternativeClusterName)
- {
- this.groupId = groupId;
- this.accepted = alternativeClusterName == null;
- this.clusterName = clusterName;
- this.alternativeClusterName = alternativeClusterName;
- }
-
- public boolean isAccepted()
- {
- return accepted;
- }
-
- public SimpleString getClusterName()
- {
- return clusterName;
- }
-
- public SimpleString getAlternativeClusterName()
- {
- return alternativeClusterName;
- }
-
- public SimpleString getChosenClusterName()
- {
- return alternativeClusterName != null? alternativeClusterName : clusterName;
- }
-
- @Override
- public String toString()
- {
- return "accepted = " + accepted + " clusterName = " +
clusterName + " alternativeClusterName = " + alternativeClusterName;
- }
-
- public SimpleString getGroupId()
- {
- return groupId;
- }
-}
Copied: trunk/src/main/org/hornetq/core/server/group/impl/Response.java (from rev 8139,
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/Response.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A response to a proposal
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class Response
+{
+ private final boolean accepted;
+
+ private final SimpleString clusterName;
+
+ private final SimpleString alternativeClusterName;
+
+ private SimpleString groupId;
+
+ public Response(SimpleString groupId, SimpleString clusterName)
+ {
+ this(groupId, clusterName, null);
+ }
+
+ public Response(SimpleString groupId, SimpleString clusterName, SimpleString
alternativeClusterName)
+ {
+ this.groupId = groupId;
+ this.accepted = alternativeClusterName == null;
+ this.clusterName = clusterName;
+ this.alternativeClusterName = alternativeClusterName;
+ }
+
+ public boolean isAccepted()
+ {
+ return accepted;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+
+ public SimpleString getAlternativeClusterName()
+ {
+ return alternativeClusterName;
+ }
+
+ public SimpleString getChosenClusterName()
+ {
+ return alternativeClusterName != null? alternativeClusterName : clusterName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "accepted = " + accepted + " clusterName = " +
clusterName + " alternativeClusterName = " + alternativeClusterName;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-28 16:13:10
UTC (rev 8157)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-28 18:32:19
UTC (rev 8158)
@@ -61,6 +61,7 @@
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -91,6 +92,11 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -199,6 +205,8 @@
private final Set<ActivateCallback> activateCallbacks = new
HashSet<ActivateCallback>();
+ private GroupingHandler groupingHandler;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -267,11 +275,11 @@
{
return;
}
-
+
if (configuration.isRunSyncSpeedTest())
{
SyncSpeedTest test = new SyncSpeedTest();
-
+
test.run();
}
@@ -321,6 +329,11 @@
clusterManager.stop();
}
+ if(groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
// Need to flush all sessions to make sure all confirmations get sent back to
client
for (ServerSession session : sessions.values())
@@ -763,6 +776,16 @@
return executorFactory;
}
+ public void setGroupingHandler(GroupingHandler groupingHandler)
+ {
+ this.groupingHandler = groupingHandler;
+ }
+
+ public GroupingHandler getGroupingHandler()
+ {
+ return groupingHandler;
+ }
+
// Public
//
---------------------------------------------------------------------------------------
@@ -828,7 +851,7 @@
// Private
//
--------------------------------------------------------------------------------------
-
+
private boolean startReplication() throws Exception
{
String backupConnectorName = configuration.getBackupConnectorName();
@@ -964,7 +987,8 @@
resourceManager = new
ResourceManagerImpl((int)(configuration.getTransactionTimeout() / 1000),
configuration.getTransactionTimeoutScanPeriod(),
scheduledPool);
- postOffice = new PostOfficeImpl(storageManager,
+ postOffice = new PostOfficeImpl(this,
+ storageManager,
pagingManager,
queueFactory,
managementService,
@@ -1027,7 +1051,7 @@
securityDeployer.start();
}
}
-
+
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
// Load the journal and populate queues, transactions and caches in memory
loadJournal();
@@ -1106,8 +1130,10 @@
{
List<QueueBindingInfo> queueBindingInfos = new
ArrayList<QueueBindingInfo>();
- storageManager.loadBindingJournal(queueBindingInfos);
+ List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
+ storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+
// Set the node id - must be before we load the queues into the postoffice, but
after we load the journal
setNodeID();
@@ -1134,6 +1160,14 @@
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
}
+ for (GroupingInfo groupingInfo : groupingInfos)
+ {
+ if(groupingHandler != null)
+ {
+ groupingHandler.addGroupBinding(new GroupBinding(groupingInfo.getId(),
groupingInfo.getGroupId(), groupingInfo.getClusterName()));
+ }
+ }
+
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager,
queues, duplicateIDMap);
@@ -1290,6 +1324,27 @@
}
}
+ private synchronized void deployGroupingHandlerConfiguration(final
GroupingHandlerConfiguration config) throws Exception
+ {
+ if (config != null)
+ {
+ GroupingHandler groupingHandler;
+ if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+ {
+ groupingHandler = new LocalGroupingHandler(managementService,
config.getName(), config.getAddress(),
+ getStorageManager(),
+ config.getTimeout());
+ }
+ else
+ {
+ groupingHandler = new RemoteGroupingHandler(managementService,
config.getName(), config.getAddress(), config.getTimeout());
+ }
+ log.info("deploying grouping handler: " + groupingHandler);
+ this.groupingHandler = groupingHandler;
+ managementService.addNotificationListener(groupingHandler);
+ }
+ }
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/tests/hornetq-tests.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,8 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
- <output-test url="file://$MODULE_DIR$/output" />
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src"
isTestSource="true" />
@@ -14,6 +12,7 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" name="etc"
level="project" />
<orderEntry type="module" module-name="hornetq" />
<orderEntry type="library" name="messaging_jars"
level="project" />
<orderEntry type="library" name="messaging-tests"
level="project" />
Modified: trunk/tests/jms-tests/hornetq-jms-tests.iml
===================================================================
--- trunk/tests/jms-tests/hornetq-jms-tests.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/tests/jms-tests/hornetq-jms-tests.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,8 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
- <output-test url="file://$MODULE_DIR$/output" />
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src"
isTestSource="true" />
Modified: trunk/tests/joram-tests/hornetq-joram-tests.iml
===================================================================
--- trunk/tests/joram-tests/hornetq-joram-tests.iml 2009-10-28 16:13:10 UTC (rev 8157)
+++ trunk/tests/joram-tests/hornetq-joram-tests.iml 2009-10-28 18:32:19 UTC (rev 8158)
@@ -1,8 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/output/classes" />
- <output-test url="file://$MODULE_DIR$/output" />
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src"
isTestSource="true" />
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -45,8 +45,11 @@
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
@@ -56,7 +59,7 @@
* A ClusterTestBase
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 30 Jan 2009 11:29:43
*
*
@@ -85,21 +88,21 @@
super.setUp();
checkFreePort(PORTS);
-
+
clearData();
}
-
+
@Override
protected void tearDown() throws Exception
{
checkFreePort(PORTS);
-
+
servers = null;
sfs = null;
-
+
consumers = null;
-
+
consumers = new ConsumerHolder[MAX_CONSUMERS];
super.tearDown();
@@ -131,7 +134,7 @@
private static final int MAX_SERVERS = 10;
- private HornetQServer[] servers = new HornetQServer[MAX_SERVERS];
+ protected HornetQServer[] servers = new HornetQServer[MAX_SERVERS];
private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
@@ -169,10 +172,29 @@
//System.out.println(threadDump(" - fired by
ClusterTestBase::waitForBindings"));
throw new IllegalStateException("Timed out waiting for messages (messageCount
= " + messageCount +
- ", expecting = " +
- count);
+ ", expecting = " +
+ count);
}
+ protected void waitForServerRestart(int node) throws Exception
+ {
+ long start = System.currentTimeMillis();
+ do
+ {
+ if(servers[node].isInitialised())
+ {
+ return;
+ }
+ Thread.sleep(100);
+ }
+ while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+ String msg = "Timed out waiting for server starting = " + node;
+
+ log.error(msg);
+
+ throw new IllegalStateException(msg);
+ }
+
protected void waitForBindings(int node,
final String address,
final int count,
@@ -215,7 +237,7 @@
{
if ((binding instanceof LocalQueueBinding && local) || (binding
instanceof RemoteQueueBinding && !local))
{
- QueueBinding qBinding = (QueueBinding)binding;
+ QueueBinding qBinding = (QueueBinding) binding;
bindingCount++;
@@ -238,8 +260,8 @@
// System.out.println(threadDump(" - fired by
ClusterTestBase::waitForBindings"));
String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount +
- ", totConsumers = " +
- totConsumers;
+ ", totConsumers = " +
+ totConsumers;
log.error(msg);
@@ -416,6 +438,52 @@
session.close();
}
+ protected void sendWithProperty(int node, String address, int numMessages, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ {
+ sendInRange(node, address, 0, numMessages, durable, key, val);
+ }
+
+ protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ {
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = msgStart; i < msgEnd; i++)
+ {
+ ClientMessage message = session.createClientMessage(durable);
+
+ message.putStringProperty(key, val);
+ message.putIntProperty(COUNT_PROP, i);
+ producer.send(message);
+ }
+
+ session.close();
+ }
+
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
+ {
+ setUpGroupHandler(type, node, 5000);
+ }
+
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int
timeout)
+ {
+ this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
+ new GroupingHandlerConfiguration(new
SimpleString("grouparbitrator"), type, new SimpleString("queues"),
timeout));
+ }
+
+ protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
+ {
+ this.servers[node].setGroupingHandler(groupingHandler);
+ }
+
protected void send(int node, String address, int numMessages, boolean durable, String
filterVal) throws Exception
{
sendInRange(node, address, 0, numMessages, durable, filterVal);
@@ -431,6 +499,117 @@
verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
}
+ protected void verifyReceiveAllWithGroupIDRoundRobin(
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
+ }
+
+ protected int verifyReceiveAllOnSingleConsumer(int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
+ }
+
+ protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
+ long firstReceiveTime,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws
Exception
+ {
+ HashMap<SimpleString, Integer> groupIdsReceived = new
HashMap<SimpleString, Integer>();
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ConsumerHolder holder = consumers[consumerIDs[i]];
+
+ if (holder == null)
+ {
+ throw new IllegalArgumentException("No consumer at " +
consumerIDs[i]);
+ }
+
+ for (int j = msgStart; j < msgEnd; j++)
+ {
+ ClientMessage message = holder.consumer.receive(2000);
+
+ if (message == null)
+ {
+ log.info("*** dumping consumers:");
+
+ dumpConsumers();
+
+ assertNotNull("consumer " + consumerIDs[i] + " did not
receive message " + j, message);
+ }
+
+ if (ack)
+ {
+ message.acknowledge();
+ }
+
+ if (firstReceiveTime != -1)
+ {
+ assertTrue("Message received too soon",
System.currentTimeMillis() >= firstReceiveTime);
+ }
+
+ SimpleString id = (SimpleString)
message.getProperty(MessageImpl.HDR_GROUP_ID);
+ System.out.println("received " + id + " on consumer " +
consumerIDs[i]);
+ if (groupIdsReceived.get(id) == null)
+ {
+ groupIdsReceived.put(id, i);
+ }
+ else if (groupIdsReceived.get(id) != i)
+ {
+ fail("consumer " + groupIdsReceived.get(id) + " already
bound to groupid " + id + " received on consumer " + i);
+ }
+
+ }
+
+ }
+
+
+ }
+
+ protected int verifyReceiveAllOnSingleConsumer(boolean ack,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ int groupIdsReceived = -1;
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ConsumerHolder holder = consumers[consumerIDs[i]];
+
+ if (holder == null)
+ {
+ throw new IllegalArgumentException("No consumer at " +
consumerIDs[i]);
+ }
+ ClientMessage message = holder.consumer.receive(2000);
+ if (message != null)
+ {
+ groupIdsReceived = i;
+ for (int j = msgStart + 1; j < msgEnd; j++)
+ {
+ message = holder.consumer.receive(2000);
+
+ if (message == null)
+ {
+ fail("consumer " + i + " did not receive all
messages");
+ }
+
+ if (ack)
+ {
+ message.acknowledge();
+ }
+ }
+ }
+
+ }
+ return groupIdsReceived;
+
+ }
+
protected void verifyReceiveAllInRangeNotBefore(boolean ack,
long firstReceiveTime,
int msgStart,
@@ -459,7 +638,7 @@
assertNotNull("consumer " + consumerIDs[i] + " did not
receive message " + j, message);
}
-
+
if (ack)
{
message.acknowledge();
@@ -470,7 +649,7 @@
assertTrue("Message received too soon",
System.currentTimeMillis() >= firstReceiveTime);
}
- if (j != (Integer)(message.getProperty(COUNT_PROP)))
+ if (j != (Integer) (message.getProperty(COUNT_PROP)))
{
outOfOrder = true;
System.out.println("Message j=" + j + " was received out of
order = " + message.getProperty(COUNT_PROP));
@@ -528,8 +707,8 @@
if (message != null)
{
log.info("check receive Consumer " + consumerIDs[i] +
- " received message " +
- message.getProperty(COUNT_PROP));
+ " received message " +
+ message.getProperty(COUNT_PROP));
}
else
{
@@ -600,7 +779,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
Integer prevCount = countMap.get(i);
@@ -624,7 +803,7 @@
}
else
{
- // log.info("consumer " + consumerIDs[i] +" returns
null");
+ // log.info("consumer " + consumerIDs[i] +" returns
null");
}
}
while (message != null);
@@ -662,7 +841,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
@@ -710,7 +889,7 @@
assertNotNull(list);
- int elem = (Integer)list.poll();
+ int elem = (Integer) list.poll();
assertEquals(messageCounts[i], elem);
@@ -750,7 +929,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
ints.add(count);
}
@@ -846,7 +1025,7 @@
}
ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc,
serverBackuptc);
-
+
sf.setFailoverOnServerShutdown(false);
sf.setRetryInterval(100);
sf.setRetryIntervalMultiplier(1d);
@@ -958,6 +1137,7 @@
servers[node] = server;
}
+
protected void setupServerWithDiscovery(int node,
String groupAddress,
int port,
@@ -1054,21 +1234,21 @@
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
- nettyBackuptc == null ? null :
nettyBackuptc.getName()));
+ nettyBackuptc == null ? null : nettyBackuptc.getName()));
}
else
{
connectorPairs.add(new Pair<String, String>(invmtc_c.getName(),
invmBackuptc == null ? null
-
: invmBackuptc.getName()));
+ : invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 250,
-
connectorPairs);
+ null,
+ -1,
+ groupAddress,
+ port,
+ 250,
+ connectorPairs);
configuration.getBroadcastGroupConfigurations().add(bcConfig);
@@ -1091,12 +1271,12 @@
protected Map<String, Object> generateParams(int node, boolean netty)
{
- Map<String, Object> params = new HashMap<String, Object>();
+ Map<String, Object> params = new HashMap<String, Object>();
if (netty)
{
params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
-
org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+ org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT +
node);
}
else
{
@@ -1167,12 +1347,12 @@
pairs.add(connectorPair);
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
100,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
pairs);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1218,12 +1398,12 @@
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
250,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1288,12 +1468,12 @@
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
250,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1314,12 +1494,12 @@
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
100,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
discoveryGroupName);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs =
server.getConfiguration().getClusterConfigurations();
clusterConfs.add(clusterConf);
Copied:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
(from rev 8139,
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java)
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,1061 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.GroupBinding;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.management.Notification;
+import org.hornetq.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ClusteredGroupingTest extends ClusterTestBase
+{
+
+ public void testGroupingSimple() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingTimeout() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupHandler(new GroupingHandler()
+ {
+ public SimpleString getName()
+ {
+ return null;
+ }
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ return null;
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ System.out.println("ClusteredGroupingTest.proposed");
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ System.out.println("ClusteredGroupingTest.send");
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ return null;
+ }
+
+ public void onNotification(Notification notification)
+ {
+ System.out.println("ClusteredGroupingTest.onNotification");
+ }
+
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ System.out.println("ClusteredGroupingTest.addGroupBinding");
+ }
+
+ public Response getProposal(SimpleString fullID)
+ {
+ return null;
+ }
+ }, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ try
+ {
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ fail("should timeout");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
+
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo2queues() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queues() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesRemoteArbitrator() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 1);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 1);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesNoConsumerOnLocalQueue() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ //addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
+ public void testGroupingRoundRobin() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(0, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id2"));
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id3"));
+ verifyReceiveAllWithGroupIDRoundRobin(0, 10, 0, 1, 2);
+
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
+ public void testGroupingSendTo3queuesQueueRemoved() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(2, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 0);
+ removeConsumer(0);
+ removeConsumer(1);
+ removeConsumer(2);
+ deleteQueue(0, "queue0");
+ deleteQueue(1, "queue0");
+ deleteQueue(2, "queue0");
+ createQueue(0, "queues.testaddress", "queue1", null,
false);
+ addConsumer(3, 0, "queue1", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ sendInRange(0, "queues.testaddress", 30, 40, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(30, 40, 3);
+ sendInRange(1, "queues.testaddress", 40, 50, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(40, 50, 3);
+ sendInRange(2, "queues.testaddress", 50, 60, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(50, 60, 3);
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesPinnedNodeGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+
+ stopServers(1);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ startServers(1);
+
+ addConsumer(1, 1, "queue0", null);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+
+ verifyReceiveAllInRange(10, 20, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws
Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(true, 0, 10, 0);
+
+ closeAllConsumers();
+
+ sendInRange(2, "queues.testaddress", 10, 20, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+ stopServers(1);
+
+ startServers(1);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+
+ verifyReceiveAllInRange(10, 20, 1);
+
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
+ public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws
Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+
+ stopServers(1);
+
+
+ startServers(1);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+ addConsumer(1, 1, "queue0", null);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+
+ verifyReceiveAllInRange(10, 20, 1);
+
+
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ verifyReceiveAllInRange(20, 30, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesSendingNodeGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ closeSessionFactory(0);
+ stopServers(0);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ startServers(0);
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ setupSessionFactory(0, isNetty());
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingMultipleQueuesOnAddress() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ createQueue(0, "queues.testaddress", "queue1", null,
false);
+ createQueue(1, "queues.testaddress", "queue1", null,
false);
+ createQueue(2, "queues.testaddress", "queue1", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ addConsumer(3, 0, "queue0", null);
+ addConsumer(4, 1, "queue0", null);
+ addConsumer(5, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, true);
+ waitForBindings(1, "queues.testaddress", 2, 2, true);
+ waitForBindings(2, "queues.testaddress", 2, 2, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingMultipleSending() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread[] threads = new Thread[9];
+ int range = 0;
+ for (int i = 0; i < 9; i++, range += 10)
+ {
+ threads[i] = new Thread(new ThreadSender(range, range + 10, 1, new
SimpleString("id" + i), latch, i < 8));
+ }
+ for (Thread thread : threads)
+ {
+ thread.start();
+ }
+
+ verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1, 2);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
+
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+
+ public boolean isFileStorage()
+ {
+ return true;
+ }
+
+ class ThreadSender implements Runnable
+ {
+ private int msgStart;
+ private int msgEnd;
+ private SimpleString id;
+ private CountDownLatch latch;
+ private boolean wait;
+ private int node;
+
+ public ThreadSender(int msgStart, int msgEnd, int node, SimpleString id,
CountDownLatch latch, boolean wait)
+ {
+ this.msgStart = msgStart;
+ this.msgEnd = msgEnd;
+ this.node = node;
+ this.id = id;
+ this.latch = latch;
+ this.wait = wait;
+ }
+
+ public void run()
+ {
+ if (wait)
+ {
+ try
+ {
+ latch.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File |
Settings | File Templates.
+ }
+ }
+ else
+ {
+ latch.countDown();
+ }
+ try
+ {
+ sendInRange(node, "queues.testaddress", msgStart, msgEnd, false,
MessageImpl.HDR_GROUP_ID, id);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
+ }
+ }
+}
+
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQ;
+
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 26, 2009
+ */
+public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
+{
+
+ protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int
backupNode)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+ configuration.setSharedStore(false);
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY,
params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ server = HornetQ.newHornetQServer(configuration);
+ }
+ else
+ {
+ server = HornetQ.newHornetQServer(configuration, false);
+ }
+ servers[node] = server;
+ }
+
+ void setupMasterServer(int i, boolean fileStorage, boolean netty)
+ {
+ setupServer(i, fileStorage, netty, 2);
+ }
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.utils.SimpleString;
+
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 26, 2009
+ */
+public class GroupingFailoverSharedServerTest extends GroupingFailoverTestBase
+{
+ protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int
backupNode)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setJournalDirectory(getJournalDir(backupNode, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setPagingDirectory(getPageDir(backupNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode,
false));
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+ configuration.setSharedStore(true);
+
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ server = HornetQ.newHornetQServer(configuration);
+ }
+ else
+ {
+ server = HornetQ.newHornetQServer(configuration, false);
+ }
+ servers[node] = server;
+ }
+
+ public void setupMasterServer(int i, boolean fileStorage, boolean netty)
+ {
+ setupServer(i, fileStorage, netty);
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.utils.SimpleString;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Oct 26, 2009
+ */
+public abstract class GroupingFailoverTestBase extends ClusterTestBase
+{
+ public void testGroupingLocalHandlerFails() throws Exception
+ {
+ setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+
+ setupMasterServer(0, isFileStorage(), isNetty());
+
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
+
+ setupClusterConnectionWithBackups("cluster1", "queues", false,
1, isNetty(), 1, new int[]{0}, new int[]{2});
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+
+
+ startServers(2, 0, 1);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+
+ closeSessionFactory(0);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ class MyListener2 implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch2.countDown();
+ }
+ }
+
+ Map<String, MessageFlowRecord> records =
((ClusterConnectionImpl)getServer(1).getClusterManager().getClusterConnection(new
SimpleString("cluster1"))).getRecords();
+ RemotingConnection rc =
records.get("0").getBridge().getForwardingConnection() ;
+ rc.addFailureListener(new MyListener());
+ fail(rc, latch);
+
+ records =
((ClusterConnectionImpl)getServer(0).getClusterManager().getClusterConnection(new
SimpleString("cluster0"))).getRecords();
+ rc = records.get("0").getBridge().getForwardingConnection()
;
+ rc.addFailureListener(new MyListener2());
+ fail(rc, latch);
+
+
+ waitForServerRestart(2);
+
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 2);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
+ {
+ setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+
+ setupMasterServer(0, isFileStorage(), isNetty());
+
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
+
+ setupClusterConnectionWithBackups("cluster1", "queues", false,
1, isNetty(), 1, new int[]{0}, new int[]{2});
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+
+
+ startServers(2, 0, 1);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id2"));
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id3"));
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id4"));
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id5"));
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id6"));
+
+ verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1);
+
+ closeSessionFactory(0);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ class MyListener2 implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch2.countDown();
+ }
+ }
+
+ Map<String, MessageFlowRecord> records =
((ClusterConnectionImpl)getServer(1).getClusterManager().getClusterConnection(new
SimpleString("cluster1"))).getRecords();
+ RemotingConnection rc =
records.get("0").getBridge().getForwardingConnection() ;
+ rc.addFailureListener(new MyListener());
+ fail(rc, latch);
+
+ records =
((ClusterConnectionImpl)getServer(0).getClusterManager().getClusterConnection(new
SimpleString("cluster0"))).getRecords();
+ rc = records.get("0").getBridge().getForwardingConnection() ;
+ rc.addFailureListener(new MyListener2());
+ fail(rc, latch);
+
+ waitForServerRestart(2);
+
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id2"));
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id3"));
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id4"));
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id5"));
+ sendWithProperty(1, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id6"));
+
+ verifyReceiveAllWithGroupIDRoundRobin(0, 30, 1, 2);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ abstract void setupMasterServer(int i, boolean fileStorage, boolean netty);
+
+ public boolean isFileStorage()
+ {
+ return true;
+ }
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+
+ abstract void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int
backupNode);
+
+ private void fail(ClientSession session, final CountDownLatch latch) throws
InterruptedException
+ {
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ private void fail(RemotingConnection conn, final CountDownLatch latch) throws
InterruptedException
+ {
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.JournalType;
@@ -74,7 +75,7 @@
List<QueueBindingInfo> queueBindingInfos = new
ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new
ArrayList<GroupingInfo>());
Map<Long, Queue> queues = new HashMap<Long, Queue>();
@@ -92,7 +93,7 @@
queueBindingInfos = new ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new
ArrayList<GroupingInfo>());
journal.start();
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -43,6 +43,7 @@
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -50,6 +51,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -933,10 +935,20 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
- public void loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos) throws Exception
+ public void loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
}
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager,
java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
*/
@@ -1254,6 +1266,8 @@
{
}
+
+
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -81,7 +81,7 @@
{
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final BindingsImpl bind = new BindingsImpl();
+ final BindingsImpl bind = new BindingsImpl(null);
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-28
16:13:10 UTC (rev 8157)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-28
18:32:19 UTC (rev 8158)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
@@ -88,7 +89,7 @@
journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
@@ -111,7 +112,7 @@
journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
journal.loadMessageJournal(postOffice,
new FakePagingManager(),
@@ -139,7 +140,7 @@
journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
journal.loadMessageJournal(postOffice,
new FakePagingManager(),