Author: ataylor
Date: 2009-12-01 04:01:01 -0500 (Tue, 01 Dec 2009)
New Revision: 8477
Added:
trunk/examples/jms/message-group2/
trunk/examples/jms/message-group2/build.bat
trunk/examples/jms/message-group2/build.sh
trunk/examples/jms/message-group2/build.xml
trunk/examples/jms/message-group2/readme.html
trunk/examples/jms/message-group2/server0/
trunk/examples/jms/message-group2/server0/client-jndi.properties
trunk/examples/jms/message-group2/server0/hornetq-beans.xml
trunk/examples/jms/message-group2/server0/hornetq-configuration.xml
trunk/examples/jms/message-group2/server0/hornetq-jms.xml
trunk/examples/jms/message-group2/server0/hornetq-users.xml
trunk/examples/jms/message-group2/src/
trunk/examples/jms/message-group2/src/org/
trunk/examples/jms/message-group2/src/org/hornetq/
trunk/examples/jms/message-group2/src/org/hornetq/jms/
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
Modified:
trunk/docs/user-manual/en/message-grouping.xml
trunk/examples/jms/hornetq-jms-examples.iml
trunk/hornetq.iws
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-229 - added the ability to control grouping at
the connection factry level
Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/docs/user-manual/en/message-grouping.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -71,6 +71,22 @@
</entries>
<autogroup>true</autogroup>
</connection-factory></programlisting>
+ <para>Alternatively you can set the group id via the connection factory. All
messages sent with producers created
+ via this connection factory will set the
<literal>JMSXGroupID</literal> to the specified value on all messages
+ sent. To configure the group id set it on the connection factory in the
<literal>hornetq-jms.xml</literal>
+ config file as follows
+ <programlisting>
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ <group-id>Group-0</group-id>
+ </connection-factory>
+ </programlisting>
+ </para>
</section>
<section>
<title>Example</title>
@@ -78,6 +94,11 @@
groups are configured and used with JMS.</para>
</section>
<section>
+ <title>Example</title>
+ <para>See <xref linkend="examples.message-group2"/> for an
example which shows how message
+ groups are configured via a connection factory.</para>
+ </section>
+ <section>
<title> Clustered Grouping</title>
<para>Using the Grouping function in a cluster is a bit more complex. This is
because messages
with a particular group id can arrive on any node so each node needs to know
about which
Modified: trunk/examples/jms/hornetq-jms-examples.iml
===================================================================
--- trunk/examples/jms/hornetq-jms-examples.iml 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/examples/jms/hornetq-jms-examples.iml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -32,6 +32,7 @@
<sourceFolder url="file://$MODULE_DIR$/management/src"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/message-counters/src"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/message-group/src"
isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/message-group2/src"
isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/message-priority/src"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/no-consumer-buffering/src"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/paging/src"
isTestSource="false" />
Copied: trunk/examples/jms/message-group2/build.bat (from rev 8456,
trunk/examples/jms/message-group/build.bat)
===================================================================
--- trunk/examples/jms/message-group2/build.bat (rev 0)
+++ trunk/examples/jms/message-group2/build.bat 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Copied: trunk/examples/jms/message-group2/build.sh (from rev 8456,
trunk/examples/jms/message-group/build.sh)
===================================================================
--- trunk/examples/jms/message-group2/build.sh (rev 0)
+++ trunk/examples/jms/message-group2/build.sh 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Copied: trunk/examples/jms/message-group2/build.xml (from rev 8456,
trunk/examples/jms/message-group/build.xml)
===================================================================
--- trunk/examples/jms/message-group2/build.xml (rev 0)
+++ trunk/examples/jms/message-group2/build.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ JMS Message Group
Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.MessageGroup2Example"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.MessageGroup2Example"/>
+ <param name="hornetq.example.runServer"
value="false"/>
+ </antcall>
+ </target>
+
+</project>
Copied: trunk/examples/jms/message-group2/readme.html (from rev 8456,
trunk/examples/jms/message-group/readme.html)
===================================================================
--- trunk/examples/jms/message-group2/readme.html (rev 0)
+++ trunk/examples/jms/message-group2/readme.html 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,142 @@
+<html>
+ <head>
+ <title>HornetQ Message Group Example</title>
+ <link rel="stylesheet" type="text/css"
href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css"
href="../../common/prettify.css" />
+ <script type="text/javascript"
src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>Message Group Example</h1>
+ <br>
+ <p>This example shows you how to configure and use message groups via a
connection factory with HornetQ.</p>
+
+ <p>Message groups are sets of messages that has the following characteristics:
</p>
+ <li>Messages in a message group share the same group id, i.e. they have same
JMSXGroupID string property values.</li>
+ <li>Messages in a message group will be all delivered to no more than one of
the queue's consumers. The consumer that receives the
+ first message of a group will receive all the messages that belongs to the
group.</li>
+
+ <p>You can make any message belong to a message group by setting a
'group-id' on the connection factory. All producers created via this connection
factory will set that group id on its messages.
+ In this example we set the group id 'Group-0'on a connection factory and
send messages via 2 different producers and check that only 1 consumer receives them.
</p>
+
+ <p>Alternatively, HornetQ's connection factories can be configured to
<em>auto group</em> messages. By setting <code>autogroup</code> to
</code>true</code> on the <code>HornetQConnectionFactory</code>
+ (or setting
<code><autogroup>true</autogroup></code> in
<code>hornetq-jms.xml</code>'s connection factory settings), a random
unique id
+ will be picked to create a message group. <em>Every messages</em>
sent by a producer created from this connection factory will automatically
+ be part of this message group.</p>
+
+ <br>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type
<code>./build.sh</code> (or <code>build.bat</code> on windows)
from this directory</i></p>
+ <br>
+ <ol>
+ <li>First we need to get an initial context so we can look-up the JMS
connection factory and destination objects from JNDI. This initial context will get
it's properties from the <code>client-jndi.properties</code> file in the
directory <code>../common/config</code></li>
+ <pre class="prettyprint">
+ <code>InitialContext initialContext = getContext();</code>
+ </pre>
+
+ <li>We look-up the JMS queue object from JNDI</li>
+ <pre class="prettyprint">
+ <code>Queue queue = (Queue)
initialContext.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>We look-up the JMS connection factory object from JNDI</li>
+ <pre class="prettyprint">
+ <code>ConnectionFactory cf = (ConnectionFactory)
initialContext.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>We create a JMS connection</li>
+ <pre class="prettyprint">
+ <code>connection = cf.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS session. The session is created as non transacted and
will auto acknowledge messages.</li>
+ <pre class="prettyprint">
+ <code>Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We create 2 JMS message producers on the session. This will be used to
send the messages.</li>
+ <pre class="prettyprint">
+ <code>
+ MessageProducer producer1 = session.createProducer(queue);
+
+ MessageProducer producer2 = session.createProducer(queue);</code>
+ </pre>
+
+ <li>We create two consumers.</li>
+ <pre class="prettyprint">
+ <code>
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ consumer1.setMessageListener(new
SimpleMessageListener("consumer-1"));
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ consumer2.setMessageListener(new
SimpleMessageListener("consumer-2"));
+ </code>
+ </pre>
+
+ <li>We create and send 10 text messages using each producer</li>
+ <pre class="prettyprint">
+ <code>
+ int msgCount = 10;
+ for (int i = 0; i < msgCount; i++)
+ {
+ TextMessage m = session.createTextMessage("producer1 message " +
i);
+ producer1.send(m);
+ System.out.println("Sent message: " + m.getText());
+ TextMessage m2 = session.createTextMessage("producer2 message " +
i);
+ producer2.send(m2);
+ System.out.println("Sent message: " + m2.getText());
+ }
+ </code>
+ </pre>
+
+ <li>We start the connection.</li>
+ <pre class="prettyprint">
+ <code>connection.start();</code>
+ </pre>
+
+ <li>We check the group messages are received by only one
consumer</li>
+ <pre class="prettyprint">
+ <code>
+ String trueReceiver = messageReceiverMap.get("producer1 message " +
0);
+ for (int i = 0; i < msgCount; i++)
+ {
+ String receiver = messageReceiverMap.get("producer1 message " +
i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer1 message " + i +
"] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ receiver = messageReceiverMap.get("producer2 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer2 message " + i +
"] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ }
+
+ </code>
+ </pre>
+
+ <li>And finally, <b>always</b> remember to close your JMS
connections and resources after use, in a <code>finally</code> block. Closing
a JMS connection will automatically close all of its sessions, consumers, producer and
browser objects</li>
+
+ <pre class="prettyprint">
+ <code>finally
+ {
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }</code>
+ </pre>
+ </ol>
+
+ <h2>More information</h2>
+
+ <ul>
+ <li>User Manual's <a
href="../../../docs/user-manual/en/html_single/index.html#message-grouping2">Message
Grouping chapter</a></li>
+ </ul>
+
+ </body>
+</html>
\ No newline at end of file
Copied: trunk/examples/jms/message-group2/server0/client-jndi.properties (from rev 8456,
trunk/examples/jms/message-group/server0/client-jndi.properties)
===================================================================
--- trunk/examples/jms/message-group2/server0/client-jndi.properties
(rev 0)
+++ trunk/examples/jms/message-group2/server0/client-jndi.properties 2009-12-01 09:01:01
UTC (rev 8477)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Copied: trunk/examples/jms/message-group2/server0/hornetq-beans.xml (from rev 8456,
trunk/examples/jms/message-group/server0/hornetq-beans.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-beans.xml
(rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-beans.xml 2009-12-01 09:01:01 UTC
(rev 8477)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Copied: trunk/examples/jms/message-group2/server0/hornetq-configuration.xml (from rev
8456, trunk/examples/jms/message-group/server0/hornetq-configuration.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-configuration.xml 2009-12-01
09:01:01 UTC (rev 8477)
@@ -0,0 +1,33 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Copied: trunk/examples/jms/message-group2/server0/hornetq-jms.xml (from rev 8456,
trunk/examples/jms/message-group/server0/hornetq-jms.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-jms.xml (rev
0)
+++ trunk/examples/jms/message-group2/server0/hornetq-jms.xml 2009-12-01 09:01:01 UTC (rev
8477)
@@ -0,0 +1,20 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ <group-id>Group-0</group-id>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/message-group2/server0/hornetq-users.xml (from rev 8456,
trunk/examples/jms/message-group/server0/hornetq-users.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-users.xml
(rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-users.xml 2009-12-01 09:01:01 UTC
(rev 8477)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied:
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java
(from rev 8456,
trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java)
===================================================================
---
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java
(rev 0)
+++
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple JMS Queue example that sends and receives message groups.
+ *
+ * @author <a href="hgao(a)redhat.com">Howard Gao</a>
+ */
+public class MessageGroup2Example extends HornetQExample
+{
+ private Map<String, String> messageReceiverMap = new
ConcurrentHashMap<String, String>();
+ private boolean result = true;
+
+ public static void main(String[] args)
+ {
+ new MessageGroup2Example().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Connection connection = null;
+ InitialContext initialContext = null;
+ try
+ {
+ //Step 1. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ //Step 2. Perform a lookup on the queue
+ Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
+
+ //Step 3. Perform a lookup on the Connection Factory
+ ConnectionFactory cf = (ConnectionFactory)
initialContext.lookup("/ConnectionFactory");
+
+ //Step 4. Create a JMS Connection
+ connection = cf.createConnection();
+
+ //Step 5. Create a JMS Session
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Step 6. Create 2 JMS Message Producers
+ MessageProducer producer1 = session.createProducer(queue);
+
+ MessageProducer producer2 = session.createProducer(queue);
+
+ //Step 7. Create two consumers
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ consumer1.setMessageListener(new
SimpleMessageListener("consumer-1"));
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ consumer2.setMessageListener(new
SimpleMessageListener("consumer-2"));
+
+ //Step 8. Create and send 10 text messages with each producer
+ int msgCount = 10;
+ for (int i = 0; i < msgCount; i++)
+ {
+ TextMessage m = session.createTextMessage("producer1 message " +
i);
+ producer1.send(m);
+ System.out.println("Sent message: " + m.getText());
+ TextMessage m2 = session.createTextMessage("producer2 message " +
i);
+ producer2.send(m2);
+ System.out.println("Sent message: " + m2.getText());
+ }
+
+ System.out.println("all messages are sent");
+
+ //Step 9. Start the connection
+ connection.start();
+
+ Thread.sleep(2000);
+
+ //Step 10. check the group messages are received by only one consumer
+
+ String trueReceiver = messageReceiverMap.get("producer1 message " +
0);
+ for (int i = 0; i < msgCount; i++)
+ {
+ String receiver = messageReceiverMap.get("producer1 message " +
i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer1 message " + i +
"] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ receiver = messageReceiverMap.get("producer2 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer2 message " + i +
"] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ }
+
+ return result;
+ }
+ finally
+ {
+ //Step 11. Be sure to close our JMS resources!
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if(connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private class SimpleMessageListener implements MessageListener
+ {
+ private String name;
+
+ public SimpleMessageListener(String listenerName)
+ {
+ name = listenerName;
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ TextMessage msg = (TextMessage)message;
+ System.out.format("Message: [%s] received by %s\n",
+ msg.getText(),
+ name);
+ messageReceiverMap.put(msg.getText(), name);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: trunk/hornetq.iws
===================================================================
--- trunk/hornetq.iws 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/hornetq.iws 2009-12-01 09:01:01 UTC (rev 8477)
@@ -28,6 +28,7 @@
<change type="MODIFICATION"
beforePath="$PROJECT_DIR$/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java"
afterPath="$PROJECT_DIR$/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java"
/>
<change type="MODIFICATION"
beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java"
afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java"
/>
<change type="MODIFICATION"
beforePath="$PROJECT_DIR$/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java"
afterPath="$PROJECT_DIR$/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java"
/>
+ <change type="MODIFICATION"
beforePath="$PROJECT_DIR$/hornetq.iws"
afterPath="$PROJECT_DIR$/hornetq.iws" />
<change type="MODIFICATION"
beforePath="$PROJECT_DIR$/docs/user-manual/en/message-grouping.xml"
afterPath="$PROJECT_DIR$/docs/user-manual/en/message-grouping.xml" />
<change type="NEW" beforePath=""
afterPath="$PROJECT_DIR$/examples/jms/message-group2/src/org/hornetq" />
<change type="NEW" beforePath=""
afterPath="$PROJECT_DIR$/examples/jms/message-group2/src/org" />
@@ -54,10 +55,6 @@
<change type="MODIFICATION"
beforePath="$PROJECT_DIR$/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java"
afterPath="$PROJECT_DIR$/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java"
/>
<change type="NEW" beforePath=""
afterPath="$PROJECT_DIR$/examples/jms/message-group2/server0/hornetq-jms.xml"
/>
</list>
- <list name="intellij"
comment="https://jira.jboss.org/jira/browse/HORNETQ-229 - added the ability to
control grouping at the connection factry level">
- <change type="MODIFICATION"
beforePath="$PROJECT_DIR$/hornetq.ipr"
afterPath="$PROJECT_DIR$/hornetq.ipr" />
- <change type="NEW" beforePath=""
afterPath="$PROJECT_DIR$/hornetq.iws" />
- </list>
<ignored path="messaging.iws" />
<ignored path=".idea/workspace.xml" />
</component>
@@ -2188,7 +2185,7 @@
<frame x="0" y="25" width="1920"
height="1150" extended-state="0" />
<editor active="false" />
<layout>
- <window_info id="Changes" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="true" weight="0.23558648"
sideWeight="0.0" order="7" side_tool="false" />
+ <window_info id="Changes" active="true"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="true" weight="0.23558648"
sideWeight="0.0" order="7" side_tool="false" />
<window_info id="Palette" active="false"
anchor="right" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.33"
sideWeight="0.5" order="3" side_tool="false" />
<window_info id="Ant Build" active="false"
anchor="right" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.25"
sideWeight="0.5" order="1" side_tool="false" />
<window_info id="Find" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.21704657"
sideWeight="0.5" order="1" side_tool="false" />
@@ -2199,7 +2196,7 @@
<window_info id="TODO" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.33"
sideWeight="0.5" order="6" side_tool="false" />
<window_info id="Structure" active="false"
anchor="left" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.24959914"
sideWeight="0.7006937" order="1" side_tool="false" />
<window_info id="Maven Projects" active="false"
anchor="right" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.33"
sideWeight="0.5" order="3" side_tool="false" />
- <window_info id="Project" active="true"
anchor="left" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="true" weight="0.1769437"
sideWeight="0.7574553" order="0" side_tool="false" />
+ <window_info id="Project" active="false"
anchor="left" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="true" weight="0.1769437"
sideWeight="0.7574553" order="0" side_tool="false" />
<window_info id="Dependency Viewer" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.33"
sideWeight="0.5" order="7" side_tool="false" />
<window_info id="Inspection" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.4"
sideWeight="0.5" order="5" side_tool="false" />
<window_info id="Run" active="false"
anchor="bottom" auto_hide="false" internal_type="DOCKED"
type="DOCKED" visible="false" weight="0.26540756"
sideWeight="0.5" order="2" side_tool="false" />
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-12-01 09:01:01 UTC (rev 8477)
@@ -127,6 +127,9 @@
<xsd:element name="thread-pool-max-size"
type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="group-id" type="xsd:string"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
</xsd:all>
<xsd:attribute name="name"
type="xsd:string"></xsd:attribute>
</xsd:complexType>
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-01 08:55:10
UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-01 09:01:01
UTC (rev 8477)
@@ -181,4 +181,8 @@
void close();
ClientSessionFactory copy();
+
+ void setGroupID(String groupID);
+
+ String getGroupID();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -83,6 +83,7 @@
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
final boolean autoGroup,
+ final SimpleString groupID,
final int minLargeMessageSize,
final Channel channel)
{
@@ -104,7 +105,7 @@
}
else
{
- this.groupID = null;
+ this.groupID = groupID;
}
this.minLargeMessageSize = minLargeMessageSize;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -209,6 +209,8 @@
private static ScheduledExecutorService globalScheduledThreadPool;
+ private String groupID;
+
private static synchronized ExecutorService getGlobalThreadPool()
{
if (globalThreadPool == null)
@@ -381,6 +383,8 @@
cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
initialMessagePacketSize = other.getInitialMessagePacketSize();
+
+ groupID = other.getGroupID();
}
public ClientSessionFactoryImpl()
@@ -980,7 +984,17 @@
{
return new ClientSessionFactoryImpl(this);
}
-
+
+ public void setGroupID(final String groupID)
+ {
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
// DiscoveryListener implementation
--------------------------------------------------------
public synchronized void connectorsChanged()
@@ -1125,7 +1139,8 @@
consumerMaxRate,
blockOnNonPersistentSend,
blockOnPersistentSend,
-
initialMessagePacketSize);
+ initialMessagePacketSize,
+ groupID);
return session;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 08:55:10
UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 09:01:01
UTC (rev 8477)
@@ -177,6 +177,8 @@
private volatile boolean workDone;
+ private final String groupID;
+
// Constructors
----------------------------------------------------------------------------
public ClientSessionImpl(final FailoverManager connectionManager,
@@ -200,6 +202,7 @@
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int initialMessagePacketSize,
+ final String groupID,
final RemotingConnection remotingConnection,
final int version,
final Channel channel,
@@ -253,6 +256,8 @@
this.initialMessagePacketSize = initialMessagePacketSize;
+ this.groupID = groupID;
+
producerCreditManager = new ClientProducerCreditManagerImpl(this,
producerWindowSize);
}
@@ -1390,6 +1395,7 @@
autoCommitSends &&
blockOnNonPersistentSend,
autoCommitSends &&
blockOnPersistentSend,
autoGroup,
+ groupID == null?null:new
SimpleString(groupID),
minLargeMessageSize,
channel);
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2009-12-01 08:55:10
UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2009-12-01 09:01:01
UTC (rev 8477)
@@ -47,7 +47,8 @@
final int consumerMaxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
- final int initialMessagePacketSize) throws
HornetQException;
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException;
void removeSession(final ClientSessionInternal session);
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -311,7 +311,8 @@
final int consumerMaxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
- final int initialMessagePacketSize) throws
HornetQException
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException
{
synchronized (createSessionLock)
{
@@ -423,6 +424,7 @@
cacheLargeMessageClient,
minLargeMessageSize,
initialMessagePacketSize,
+ groupID,
theConnection,
response.getServerVersion(),
sessionChannel,
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -557,6 +557,16 @@
return sessionFactory;
}
+ public void setGroupID(final String groupID)
+ {
+ sessionFactory.setGroupID(groupID);
+ }
+
+ public String getGroupID()
+ {
+ return sessionFactory.getGroupID();
+ }
+
public void close()
{
sessionFactory.close();
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-12-01 08:55:10 UTC
(rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-12-01 09:01:01 UTC
(rev 8477)
@@ -165,6 +165,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
List<String> jndiBindings) throws Exception;
void createConnectionFactory(String name,
@@ -199,6 +200,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
List<String> jndiBindings) throws Exception;
/**
Modified:
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
---
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -159,4 +159,7 @@
void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
+ String getGroupID();
+
+ void setGroupID(String groupID);
}
Modified:
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
---
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -102,6 +102,8 @@
private boolean failoverOnServerShutdown =
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+ private String groupID = null;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -471,6 +473,16 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
}
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ public void setGroupID(String groupID)
+ {
+ this.groupID = groupID;
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-12-01 08:55:10
UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-12-01 09:01:01
UTC (rev 8477)
@@ -154,7 +154,7 @@
int threadPoolMaxSize = getInteger(e, "thread-pool-max-size",
ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE, MINUS_ONE_OR_GT_ZERO);
String connectionLoadBalancingPolicyClassName = getString(e,
"connection-load-balancing-policy-class-name",
ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
Validators.NOT_NULL_OR_EMPTY);
long discoveryInitialWaitTimeout = getLong(e,
"discovery-initial-wait-timeout",
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, GT_ZERO);
-
+ String groupid = getString(e, "group-id", null,
Validators.NO_CHECK);
List<String> jndiBindings = new ArrayList<String>();
List<Pair<TransportConfiguration, TransportConfiguration>>
connectorConfigs = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
@@ -265,6 +265,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupid,
jndiBindings);
}
else
@@ -298,6 +299,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupid,
jndiBindings);
}
}
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -408,6 +408,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean
failoverOnServerShutdown,
+ final String groupId,
final List<String>
jndiBindings) throws Exception
{
checkInitialised();
@@ -442,6 +443,7 @@
cf.setMaxRetryInterval(maxRetryInterval);
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ cf.setGroupID(groupId);
}
bindConnectionFactory(cf, name, jndiBindings);
@@ -479,6 +481,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean
failoverOnServerShutdown,
+ final String groupId,
final List<String>
jndiBindings) throws Exception
{
checkInitialised();
@@ -752,7 +755,7 @@
if (config == null)
{
return;
- }
+ }
if (config.getContext() != null)
{
@@ -796,6 +799,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
+ config.getGroupID(),
Arrays.asList(config.getBindings()));
}
else
@@ -829,6 +833,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
+ config.getGroupID(),
Arrays.asList(config.getBindings()));
}
}
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -122,6 +122,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupID,
Object[] jndiBindings) throws Exception;
void createConnectionFactory(@Parameter(name = "name") String name,
@@ -156,6 +157,7 @@
@Parameter(name = "maxRetryInterval") long
maxRetryInterval,
@Parameter(name = "reconnectAttempts") int
reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown")
boolean failoverOnServerShutdown,
+ @Parameter(name = "groupID") String groupID,
@Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
void createConnectionFactory(String name,
@@ -203,6 +205,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupID,
Object[] jndiBindings) throws Exception;
@Operation(desc = "Create a JMS ConnectionFactory", impact = ACTION)
@@ -238,6 +241,7 @@
@Parameter(name = "maxRetryInterval") long
maxRetryInterval,
@Parameter(name = "reconnectAttempts") int
reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown")
boolean failoverOnServerShutdown,
+ @Parameter(name = "groupID") String groupID,
@Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
void createConnectionFactory(String name,
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -249,6 +249,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final Object[] jndiBindings) throws Exception
{
List<Pair<TransportConfiguration, TransportConfiguration>> pairs =
convertToConnectorPairs(liveConnectorsTransportClassNames,
@@ -287,6 +288,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
jndiBindingsList);
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
@@ -324,6 +326,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final String jndiBindings) throws Exception
{
Object[] liveClassNames = toArray(liveTransportClassNames);
@@ -364,6 +367,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
bindings);
}
@@ -423,6 +427,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final Object[] jndiBindings) throws Exception
{
List<String> jndiBindingsList = convert(jndiBindings);
@@ -459,6 +464,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
jndiBindingsList);
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
@@ -496,6 +502,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final String jndiBindings) throws Exception
{
Object[] bindings = toArray(jndiBindings);
@@ -532,6 +539,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
bindings);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -117,6 +117,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
cf =
(HornetQConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-12-01 08:55:10
UTC (rev 8476)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-12-01 09:01:01
UTC (rev 8477)
@@ -113,6 +113,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
cf =
(HornetQConnectionFactory)getInitialContext().lookup("/testsuitecf");
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -342,6 +342,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
}
Added:
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Dec 1, 2009
+ */
+public class MessageGroupingConnectionFactoryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(MessageGroupingTest.class);
+
+ private HornetQServer server;
+
+ private ClientSession clientSession;
+
+ private SimpleString qName = new
SimpleString("MessageGroupingTestQueue");
+
+
+ public void testBasicGroupingUsingConnection() throws Exception
+ {
+ doTestBasicGroupingUsingConnectionFactory(false);
+ }
+
+ public void testBasicGroupingUsingConnectionDirect() throws Exception
+ {
+ doTestBasicGroupingUsingConnectionFactory(true);
+ }
+
+ public void testBasicGroupingMultipleProducers() throws Exception
+ {
+ doTestBasicGroupingMultipleProducers(false);
+ }
+
+ public void testBasicGroupingMultipleProducersDirect() throws Exception
+ {
+ doTestBasicGroupingMultipleProducers(true);
+ }
+
+ private void doTestBasicGroupingUsingConnectionFactory(boolean directDelivery) throws
Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ clientProducer.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
+ private void doTestBasicGroupingMultipleProducers(boolean directDelivery) throws
Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientProducer clientProducer2 = clientSession.createProducer(qName);
+ ClientProducer clientProducer3 = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ clientProducer.send(message);
+ clientProducer2.send(message);
+ clientProducer3.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages * 3);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(300, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (HornetQException e1)
+ {
+ //
+ }
+ }
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ server = null;
+ clientSession = null;
+
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ConfigurationImpl configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ server = HornetQ.newHornetQServer(configuration, false);
+ // start the server
+ server.start();
+
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory.setGroupID("grp1");
+ clientSession = sessionFactory.createSession(false, true, true);
+ clientSession.createQueue(qName, qName, null, false);
+ }
+
+ private static class DummyMessageHandler implements MessageHandler
+ {
+ ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
+
+ private CountDownLatch latch;
+
+ private final boolean acknowledge;
+
+ public DummyMessageHandler(CountDownLatch latch, boolean acknowledge)
+ {
+ this.latch = latch;
+ this.acknowledge = acknowledge;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ list.add(message);
+ if (acknowledge)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ latch.countDown();
+ }
+
+ public void reset(CountDownLatch latch)
+ {
+ list.clear();
+ this.latch = latch;
+ }
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -153,6 +153,39 @@
consumer2.close();
}
+ private void doTestBasicGroupingUsingConnectionFactory(boolean directDelivery) throws
Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ SimpleString groupId = new SimpleString("grp1");
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+ clientProducer.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
public void testMultipleGroupingConsumeHalf() throws Exception
{
ClientProducer clientProducer = clientSession.createProducer(qName);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -175,6 +175,7 @@
1000,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -248,6 +248,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -287,6 +287,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -118,6 +118,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
0,
false,
+ null,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -275,6 +275,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -187,6 +187,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -453,6 +453,7 @@
ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
bindings);
}
});
@@ -499,6 +500,7 @@
ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
}
});
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-12-01
08:55:10 UTC (rev 8476)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-12-01
09:01:01 UTC (rev 8477)
@@ -124,6 +124,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
Object[] jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -158,6 +159,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -193,6 +195,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -227,6 +230,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -410,6 +414,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
Object[] jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -445,6 +450,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -481,6 +487,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -516,6 +523,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-12-01 08:55:10 UTC (rev
8476)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-12-01 09:01:01 UTC (rev
8477)
@@ -226,6 +226,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}