[jboss-cvs] JBoss Messaging SVN: r6446 - in trunk: examples/jms and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 16 09:09:51 EDT 2009
Author: timfox
Date: 2009-04-16 09:09:50 -0400 (Thu, 16 Apr 2009)
New Revision: 6446
Added:
trunk/examples/jms/divert/
trunk/examples/jms/divert/build.xml
trunk/examples/jms/divert/readme.html
trunk/examples/jms/divert/server0/
trunk/examples/jms/divert/server0/client-jndi.properties
trunk/examples/jms/divert/server0/jbm-configuration.xml
trunk/examples/jms/divert/server0/jbm-jms.xml
trunk/examples/jms/divert/server0/jbm-queues.xml
trunk/examples/jms/divert/server0/jbm-security.xml
trunk/examples/jms/divert/server0/jbm-standalone-beans.xml
trunk/examples/jms/divert/server1/
trunk/examples/jms/divert/server1/client-jndi.properties
trunk/examples/jms/divert/server1/jbm-configuration.xml
trunk/examples/jms/divert/server1/jbm-jms.xml
trunk/examples/jms/divert/server1/jbm-queues.xml
trunk/examples/jms/divert/server1/jbm-security.xml
trunk/examples/jms/divert/server1/jbm-standalone-beans.xml
trunk/examples/jms/divert/src/
trunk/examples/jms/divert/src/org/
trunk/examples/jms/divert/src/org/jboss/
trunk/examples/jms/divert/src/org/jboss/jms/
trunk/examples/jms/divert/src/org/jboss/jms/example/
trunk/examples/jms/divert/src/org/jboss/jms/example/DivertExample.java
Modified:
trunk/.classpath
trunk/examples/jms/bridge/server0/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/config/cluster/DivertConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/divert/DivertTest.java
Log:
divert example
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/.classpath 2009-04-16 13:09:50 UTC (rev 6446)
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
+ <classpathentry kind="src" path="examples/jms/divert/src"/>
<classpathentry kind="src" path="examples/jms/message-group/src"/>
<classpathentry kind="src" path="examples/jms/ssl-enabled/src"/>
<classpathentry kind="src" path="build/src"/>
Modified: trunk/examples/jms/bridge/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/jbm-configuration.xml 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/examples/jms/bridge/server0/jbm-configuration.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -19,7 +19,7 @@
</connector>
</connectors>
- <!-- Acceptors -->
+ <!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
<factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
@@ -27,10 +27,12 @@
</acceptor>
</acceptors>
- <queues>
- <queue name="jms.queue.sausage-factory" address="jms.queue.sausage-factory"/>
- </queues>
-
+ <!-- We need to create a core queue for the JMS queue explicitly because the bridge will be deployed
+ before the JMS queue is deployed, so the first time, it otherwise won't find the queue -->
+ <queues>
+ <queue name="jms.queue.sausage-factory" address="jms.queue.sausage-factory"/>
+ </queues>
+
<!-- We set-up a bridge that forwards from a queue on this node to an address on another node.
We specify a filter with the bridge, and a transformer too. The filter and transformer are optional -->
<bridges>
Property changes on: trunk/examples/jms/divert
___________________________________________________________________
Name: svn:ignore
+ build
logs
Added: trunk/examples/jms/divert/build.xml
===================================================================
--- trunk/examples/jms/divert/build.xml (rev 0)
+++ trunk/examples/jms/divert/build.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+
+<!-- =========================================================================================== -->
+<!-- -->
+<!-- JBoss, Home of Professional Open Source -->
+<!-- Copyright 2005, JBoss Inc., and individual contributors as indicated -->
+<!-- by the @authors tag. See the copyright.txt in the distribution for a -->
+<!-- full listing of individual contributors. -->
+<!-- -->
+<!-- This is free software; you can redistribute it and/or modify it -->
+<!-- under the terms of the GNU Lesser General Public License as -->
+<!-- published by the Free Software Foundation; either version 2.1 of -->
+<!-- the License, or (at your option) any later version. -->
+<!-- -->
+<!-- This software is distributed in the hope that it will be useful, -->
+<!-- but WITHOUT ANY WARRANTY; without even the implied warranty of -->
+<!-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -->
+<!-- Lesser General Public License for more details. -->
+<!-- -->
+<!-- You should have received a copy of the GNU Lesser General Public -->
+<!-- License along with this software; if not, write to the Free -->
+<!-- Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -->
+<!-- 02110-1301 USA, or see the FSF site: http://www.fsf.org. -->
+<!-- -->
+<!-- =========================================================================================== -->
+
+
+<project default="run" name="JBoss Messaging JMS Divert Example">
+
+ <import file="../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.jboss.jms.example.DivertExample"/>
+ <param name="jbm.example.beans.file" value="server0 server1"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.jboss.jms.example.DivertExample"/>
+ <param name="jbm.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
\ No newline at end of file
Added: trunk/examples/jms/divert/readme.html
===================================================================
--- trunk/examples/jms/divert/readme.html (rev 0)
+++ trunk/examples/jms/divert/readme.html 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,92 @@
+<html>
+ <head>
+ <title>JBoss Messaging Divert Example</title>
+ <link rel="stylesheet" type="text/css" href="../common/common.css">
+ </head>
+ <body>
+ <h1>JBoss Messaging Divert Example</h1>
+ <br>
+ <p>This example shows you how to send and receive a message to a JMS Queue with JBoss Messaging.</p>
+ <p>Queues are a standard part of JMS, please consult the JMS 1.1 specification for full details.</p>
+ <p>A Queue is used to send messages point to point, from a producer to a consumer. The queue guarantees message ordering between these 2 points.</p>
+ <br>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
+ <br>
+ <ol>
+ <li>First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the <code>client-jndi.properties</code> file in the directory <code>../common/config</code></li>
+ <pre>
+ <code>InitialContext initialContext = getContext();</code>
+ </pre>
+
+ <li>We look-up the JMS queue object from JNDI</li>
+ <pre>
+ <code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>We look-up the JMS connection factory object from JNDI</li>
+ <pre>
+ <code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>We create a JMS connection</li>
+ <pre>
+ <code>connection = cf.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS session. The session is created as non transacted and will auto acknowledge messages.</li>
+ <pre>
+ <code>Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We create a JMS message producer on the session. This will be used to send the messages.</li>
+ <pre>
+ <code>MessageProducer messageProducer = session.createProducer(topic);</code>
+ </pre>
+
+ <li>We create a JMS text message that we are going to send.</li>
+ <pre>
+ <code>TextMessage message = session.createTextMessage("This is a text message");</code>
+ </pre>
+
+ <li>We send message to the queue</li>
+ <pre>
+ <code>messageProducer.send(message);</code>
+ </pre>
+
+ <li>We create a JMS Message Consumer to receive the message.</li>
+ <pre>
+ <code>MessageConsumer messageConsumer = session.createConsumer(queue);</code>
+ </pre>
+
+ <li>We start the connection. In order for delivery to occur on any consumers or subscribers on a connection, the connection must be started</li>
+ <pre>
+ <code>connection.start();</code>
+ </pre>
+
+ <li>The message arrives at the consumer. In this case we use a timeout of 5000 milliseconds but we could use a blocking 'receive()'</li>
+ <pre>
+ <code>TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);</code>
+ </pre>
+
+ <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre>
+ <code>finally
+ {
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }</code>
+ </pre>
+
+
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Property changes on: trunk/examples/jms/divert/server0
___________________________________________________________________
Name: svn:ignore
+ data
logs
Added: trunk/examples/jms/divert/server0/client-jndi.properties
===================================================================
--- trunk/examples/jms/divert/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/divert/server0/client-jndi.properties 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/divert/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/jms/divert/server0/jbm-configuration.xml (rev 0)
+++ trunk/examples/jms/divert/server0/jbm-configuration.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,69 @@
+<deployment xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-configuration.xsd">
+ <configuration>
+
+ <!-- We need to make it clustered otherwise the bridge won't deploy -->
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="jbm.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+
+ <!-- This connector corresponds to the New York server -->
+
+ <connector name="newyork-connector">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="jbm.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="jbm.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Divert configuration -->
+
+ <diverts>
+ <divert name="order-divert">
+ <address>jms.queue.orders</address>
+ <forwarding-address>jms.topic.spyTopic</forwarding-address>
+ <exclusive>false</exclusive>
+ </divert>
+
+ <divert name="prices-divert">
+ <address>jms.topic.priceUpdates</address>
+ <forwarding-address>jms.queue.priceForwarding</forwarding-address>
+ <filter string="office='New York'"/>
+ <transformer-class-name>org.jboss.jms.example.AddForwardingTimeTransformer</transformer-class-name>
+ <exclusive>true</exclusive>
+ </divert>
+ </diverts>
+
+ <!-- Bridge configuration -->
+
+ <bridges>
+ <bridge name="price-forward-bridge">
+ <queue-name>jms.queue.priceForwarding</queue-name>
+ <forwarding-address>jms.topic.newYorkPriceUpdates</forwarding-address>
+ <reconnect-attempts>-1</reconnect-attempts>
+ <connector-ref connector-name="newyork-connector"/>
+ </bridge>
+ </bridges>
+
+ <!-- Other config -->
+
+ <journal-min-files>2</journal-min-files>
+
+ </configuration>
+
+</deployment>
Added: trunk/examples/jms/divert/server0/jbm-jms.xml
===================================================================
--- trunk/examples/jms/divert/server0/jbm-jms.xml (rev 0)
+++ trunk/examples/jms/divert/server0/jbm-jms.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,34 @@
+<deployment xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-jms.xsd ">
+
+ <!-- the connection factory used by the example -->
+
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entry name="ConnectionFactory"/>
+ </connection-factory>
+
+ <!-- Destinations used by the example -->
+
+ <!-- The order queue -->
+ <queue name="orders">
+ <entry name="/queue/orders"/>
+ </queue>
+
+ <!-- The queue that prices are forwarded to before being bridged to the New York server -->
+ <queue name="priceForwarding">
+ <entry name="/queue/priceForwarding"/>
+ </queue>
+
+ <!-- The topic for price updates -->
+ <topic name="priceUpdates">
+ <entry name="/topic/priceUpdates"/>
+ </topic>
+
+ <!-- The spy topic for snooping on orders -->
+ <topic name="spyTopic">
+ <entry name="/topic/spyTopic"/>
+ </topic>
+
+</deployment>
\ No newline at end of file
Added: trunk/examples/jms/divert/server0/jbm-queues.xml
===================================================================
--- trunk/examples/jms/divert/server0/jbm-queues.xml (rev 0)
+++ trunk/examples/jms/divert/server0/jbm-queues.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,15 @@
+<settings xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-queues.xsd ">
+
+ <!--security for example -->
+ <security match="jms.#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security>
+
+</settings>
Added: trunk/examples/jms/divert/server0/jbm-security.xml
===================================================================
--- trunk/examples/jms/divert/server0/jbm-security.xml (rev 0)
+++ trunk/examples/jms/divert/server0/jbm-security.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,7 @@
+<deployment xmlns="urn:jboss:messaging" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-security.xsd ">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</deployment>
\ No newline at end of file
Added: trunk/examples/jms/divert/server0/jbm-standalone-beans.xml
===================================================================
--- trunk/examples/jms/divert/server0/jbm-standalone-beans.xml (rev 0)
+++ trunk/examples/jms/divert/server0/jbm-standalone-beans.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="MessagingServer" class="org.jboss.messaging.core.server.impl.MessagingServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="JBMSecurityManager"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="MessagingServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Property changes on: trunk/examples/jms/divert/server1
___________________________________________________________________
Name: svn:ignore
+ data
logs
Added: trunk/examples/jms/divert/server1/client-jndi.properties
===================================================================
--- trunk/examples/jms/divert/server1/client-jndi.properties (rev 0)
+++ trunk/examples/jms/divert/server1/client-jndi.properties 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/divert/server1/jbm-configuration.xml
===================================================================
--- trunk/examples/jms/divert/server1/jbm-configuration.xml (rev 0)
+++ trunk/examples/jms/divert/server1/jbm-configuration.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,30 @@
+<deployment xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-configuration.xsd">
+ <configuration>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="jbm.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="jbm.remoting.netty.port" value="5446" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <journal-min-files>2</journal-min-files>
+
+ </configuration>
+
+</deployment>
Added: trunk/examples/jms/divert/server1/jbm-jms.xml
===================================================================
--- trunk/examples/jms/divert/server1/jbm-jms.xml (rev 0)
+++ trunk/examples/jms/divert/server1/jbm-jms.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,20 @@
+<deployment xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-jms.xsd ">
+
+ <!-- the connection factory used by the example -->
+
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entry name="ConnectionFactory"/>
+ </connection-factory>
+
+ <!-- Destinations used by the example -->
+
+ <!-- The topic for New York price updates -->
+
+ <topic name="newYorkPriceUpdates">
+ <entry name="/topic/newYorkPriceUpdates"/>
+ </topic>
+
+</deployment>
\ No newline at end of file
Added: trunk/examples/jms/divert/server1/jbm-queues.xml
===================================================================
--- trunk/examples/jms/divert/server1/jbm-queues.xml (rev 0)
+++ trunk/examples/jms/divert/server1/jbm-queues.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,15 @@
+<settings xmlns="urn:jboss:messaging"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-queues.xsd ">
+
+ <!--security for example queue-->
+ <security match="jms.#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security>
+
+</settings>
Added: trunk/examples/jms/divert/server1/jbm-security.xml
===================================================================
--- trunk/examples/jms/divert/server1/jbm-security.xml (rev 0)
+++ trunk/examples/jms/divert/server1/jbm-security.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,7 @@
+<deployment xmlns="urn:jboss:messaging" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:jboss:messaging ../schemas/jbm-security.xsd ">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</deployment>
\ No newline at end of file
Added: trunk/examples/jms/divert/server1/jbm-standalone-beans.xml
===================================================================
--- trunk/examples/jms/divert/server1/jbm-standalone-beans.xml (rev 0)
+++ trunk/examples/jms/divert/server1/jbm-standalone-beans.xml 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="MessagingServer" class="org.jboss.messaging.core.server.impl.MessagingServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="JBMSecurityManager"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="MessagingServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
\ No newline at end of file
Added: trunk/examples/jms/divert/src/org/jboss/jms/example/DivertExample.java
===================================================================
--- trunk/examples/jms/divert/src/org/jboss/jms/example/DivertExample.java (rev 0)
+++ trunk/examples/jms/divert/src/org/jboss/jms/example/DivertExample.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -0,0 +1,246 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+/**
+ * This examples demonstrates the use of JBoss Messaging "Diverts" to transparently divert or copy messages
+ * from one address to another.
+ *
+ * Please see the readme.html for more information.
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ */
+public class DivertExample extends JMSExample
+{
+ public static void main(String[] args)
+ {
+ new DivertExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Connection connection0 = null;
+
+ Connection connection1 = null;
+
+ InitialContext initialContext0 = null;
+
+ InitialContext initialContext1 = null;
+ try
+ {
+ // Step 1. Create an initial context to perform the JNDI lookup on the London server
+ initialContext0 = getContext(0);
+
+ // Step 2. Look-up the queue orderQueue on the London server - this is the queue any orders are sent to
+ Queue orderQueue0 = (Queue)initialContext0.lookup("/queue/orders");
+
+ // Step 3. Look-up the topic priceUpdates on the London server- this is the topic that any price updates are sent to
+ Topic priceUpdates0 = (Topic)initialContext0.lookup("/topic/priceUpdates");
+
+ // Step 4. Look-up the spy topic on the London server- this is what we will use to snoop on any orders
+ Topic spyTopic0 = (Topic)initialContext0.lookup("/topic/spyTopic");
+
+ // Step 6. Create an initial context to perform the JNDI lookup on the New York server
+ initialContext1 = getContext(1);
+
+ // Step 7. Look-up the topic newYorkPriceUpdates on the New York server - any price updates sent to priceUpdates on the London server will
+ // be diverted to the queue priceForward on the London server, and a bridge will consume from that queue and forward
+ // them to the address newYorkPriceUpdates on the New York server where they will be distributed to the topic subscribers on
+ // the New York server
+ Topic newYorkPriceUpdates = (Topic)initialContext1.lookup("/topic/newYorkPriceUpdates");
+
+ // Step 8. Perform a lookup on the Connection Factory on the London server
+ ConnectionFactory cf0 = (ConnectionFactory)initialContext0.lookup("/ConnectionFactory");
+
+ // Step 9. Perform a lookup on the Connection Factory on the New York server
+ ConnectionFactory cf1 = (ConnectionFactory)initialContext1.lookup("/ConnectionFactory");
+
+ // Step 10. Create a JMS Connection on the London server
+ connection0 = cf0.createConnection();
+
+ // Step 11. Create a JMS Connection on the New York server
+ connection1 = cf1.createConnection();
+
+ // Step 12. Create a JMS Session on the London server
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 13. Create a JMS Session on the New York server
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 14. Create a JMS MessageProducer orderProducer that sends to the queue orderQueue on the London server
+ MessageProducer orderProducer = session0.createProducer(orderQueue0);
+
+ // Step 15. Create a JMS MessageProducer priceProducer that sends to the topic priceUpdates on the London server
+ MessageProducer priceProducer = session0.createProducer(priceUpdates0);
+
+ // Step 15. Create a JMS subscriber which subscribes to the spyTopic on the London server
+ MessageConsumer spySubscriberA = session0.createConsumer(spyTopic0);
+
+ // Step 16. Create another JMS subscriber which also subscribes to the spyTopic on the London server
+ MessageConsumer spySubscriberB = session0.createConsumer(spyTopic0);
+
+ // Step 17. Create a JMS MessageConsumer which consumes orders from the order queue on the London server
+ MessageConsumer orderConsumer = session0.createConsumer(orderQueue0);
+
+ // Step 18. Create a JMS subscriber which subscribes to the priceUpdates topic on the London server
+ MessageConsumer priceUpdatesSubscriber0 = session0.createConsumer(priceUpdates0);
+
+ // Step 19. Create a JMS subscriber which subscribes to the newYorkPriceUpdates topic on the New York server
+ MessageConsumer newYorkPriceUpdatesSubscriberA = session1.createConsumer(newYorkPriceUpdates);
+
+ // Step 20. Create another JMS subscriber which also subscribes to the newYorkPriceUpdates topic on the New York server
+ MessageConsumer newYorkPriceUpdatesSubscriberB = session1.createConsumer(newYorkPriceUpdates);
+
+ // Step 21. Start the connections
+
+ connection0.start();
+
+ connection1.start();
+
+ // Step 22. Create an order message
+ TextMessage orderMessage = session0.createTextMessage("This is an order");
+
+ // Step 23. Send the order message to the order queue on the London server
+ orderProducer.send(orderMessage);
+
+ System.out.println("Sent message: " + orderMessage.getText());
+
+ // Step 24. The order message is consumed by the orderConsumer on the London server
+ TextMessage receivedOrder = (TextMessage)orderConsumer.receive(5000);
+
+ System.out.println("Received order: " + receivedOrder.getText());
+
+ // Step 25. A copy of the order is also received by the spyTopic subscribers on the London server
+ TextMessage spiedOrder1 = (TextMessage)spySubscriberA.receive(5000);
+
+ System.out.println("Snooped on order: " + spiedOrder1.getText());
+
+ TextMessage spiedOrder2 = (TextMessage)spySubscriberB.receive(5000);
+
+ System.out.println("Snooped on order: " + spiedOrder2.getText());
+
+ // Step 26. Create a price update message, destined for London
+ TextMessage priceUpdateMessageLondon = session0.createTextMessage("This is a price update for London");
+
+ priceUpdateMessageLondon.setStringProperty("office", "London");
+
+ priceProducer.send(priceUpdateMessageLondon);
+
+ // Step 27. The price update *should* be received by the local subscriber since we only divert messages
+ // where office = New York
+ TextMessage receivedUpdate = (TextMessage)priceUpdatesSubscriber0.receive(2000);
+
+ System.out.println("Received price update locally: " + receivedUpdate.getText());
+
+ // Step 28. The price update *should not* be received in New York
+
+ TextMessage priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(1000);
+
+ if (priceUpdate1 != null)
+ {
+ return false;
+ }
+
+ System.out.println("Did not received price update in New York, look it's: " + priceUpdate1);
+
+ TextMessage priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(1000);
+
+ if (priceUpdate2 != null)
+ {
+ return false;
+ }
+
+ System.out.println("Did not received price update in New York, look it's: " + priceUpdate2);
+
+ // Step 29. Create a price update message, destined for New York
+
+ TextMessage priceUpdateMessageNewYork = session0.createTextMessage("This is a price update for New York");
+
+ priceUpdateMessageNewYork.setStringProperty("office", "New York");
+
+ // Step 30. Send the price update message to the priceUpdates topic on the London server
+ priceProducer.send(priceUpdateMessageNewYork);
+
+ // Step 31. The price update *should not* be received by the local subscriber to the priceUpdates topic
+ // since it has been *exclusively* diverted to the priceForward queue, because it has a header saying
+ // it is destined for the New York office
+ Message message = priceUpdatesSubscriber0.receive(1000);
+
+ if (message != null)
+ {
+ return false;
+ }
+
+ System.out.println("Didn't receive local price update, look, it's: " + message);
+
+ // Step 32. The remote subscribers on server 1 *should* receive a copy of the price update since
+ // it has been diverted to a local priceForward queue which has a bridge consuming from it and which
+ // forwards it to the same address on server 1.
+ // We notice how the forwarded messages have had a special header added by our custom transformer that
+ // we told the divert to use
+
+ priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(5000);
+
+ System.out.println("Received forwarded price update on server 1: " + priceUpdate1.getText());
+ System.out.println("Time of forward: " + priceUpdate1.getLongProperty("time_of_forward"));
+
+ priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(5000);
+
+ System.out.println("Received forwarded price update on server 2: " + priceUpdate2.getText());
+ System.out.println("Time of forward: " + priceUpdate2.getLongProperty("time_of_forward"));
+
+ return true;
+ }
+ finally
+ {
+ // Step 12. Be sure to close our resources!
+ if (initialContext0 != null)
+ {
+ initialContext0.close();
+ }
+ if (initialContext1 != null)
+ {
+ initialContext1.close();
+ }
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+ }
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/DivertConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/DivertConfiguration.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/DivertConfiguration.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -24,6 +24,9 @@
import java.io.Serializable;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.UUIDGenerator;
+
/**
* A DivertConfiguration
*
@@ -36,7 +39,10 @@
public class DivertConfiguration implements Serializable
{
private static final long serialVersionUID = 6910543740464269629L;
+
+ private static final Logger log = Logger.getLogger(DivertConfiguration.class);
+
private final String name;
private final String routingName;
@@ -60,11 +66,18 @@
final String transformerClassName)
{
this.name = name;
- this.routingName = routingName;
+ if (routingName == null)
+ {
+ this.routingName = UUIDGenerator.getInstance().generateStringUUID();
+ }
+ else
+ {
+ this.routingName = routingName;
+ }
this.address = address;
this.forwardingAddress = forwardingAddress;
this.exclusive = exclusive;
- this.filterString = filterString;
+ this.filterString = filterString;
this.transformerClassName = transformerClassName;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -138,7 +138,7 @@
ByteBuffer buff = ByteBuffer.wrap(ids);
Set<Bindable> chosen = new HashSet<Bindable>();
-
+
while (buff.hasRemaining())
{
int bindingID = buff.getInt();
@@ -259,17 +259,25 @@
public void route(final ServerMessage message, final Transaction tx) throws Exception
{
+ boolean routed = false;
+
if (!exclusiveBindings.isEmpty())
{
for (Binding binding : exclusiveBindings)
{
- binding.getBindable().route(message, tx);
+ if (binding.getFilter() == null || binding.getFilter().match(message))
+ {
+ binding.getBindable().route(message, tx);
+
+ routed = true;
+ }
}
}
- else
+
+ if (!routed)
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
- {
+ {
routeFromCluster(message, tx);
}
else
@@ -281,7 +289,7 @@
SimpleString routingName = entry.getKey();
List<Binding> bindings = entry.getValue();
-
+
if (bindings == null)
{
// The value can become null if it's concurrently removed while we're iterating - this is expected
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -161,7 +161,7 @@
}
public void willRoute(final ServerMessage message)
- {
+ {
}
public boolean isQueueBinding()
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -296,7 +296,7 @@
if (activated)
{
executor.execute(new CreateObjectsRunnable());
- }
+ }
}
private void cancelRefs() throws Exception
@@ -441,7 +441,7 @@
// Consumer implementation ---------------------------------------
public HandleStatus handle(final MessageReference ref) throws Exception
- {
+ {
if (filter != null && !filter.match(ref.getMessage()))
{
return HandleStatus.NO_MATCH;
@@ -704,7 +704,7 @@
active = true;
queue.deliverAsync(executor);
-
+
return true;
}
catch (Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -38,7 +38,6 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
import org.jboss.messaging.utils.SimpleString;
-import org.jboss.messaging.utils.UUIDGenerator;
/**
* A RemoteQueueBindingImpl
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -479,11 +479,11 @@
// Once we ready we can start the remoting service so we can start accepting connections
remotingService.start();
+
+ // Deploy any pre-defined queues - must be done *after* deploymentManager has started
+ deployQueues();
}
- // Deploy any pre-defined queues
- deployQueues();
-
if (clusterManager != null)
{
clusterManager.start();
@@ -897,6 +897,9 @@
{
deploymentManager.start();
}
+
+ //Queues must be deployed *after* deploymentManager has started
+ deployQueues();
log.info("Backup server is now operational");
}
@@ -1171,7 +1174,7 @@
Filter filter = null;
if (config.getFilterString() != null)
- {
+ {
filter = new FilterImpl(new SimpleString(config.getFilterString()));
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -359,7 +359,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/divert/DivertTest.java 2009-04-16 09:37:24 UTC (rev 6445)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/divert/DivertTest.java 2009-04-16 13:09:50 UTC (rev 6446)
@@ -20,7 +20,6 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.integration.divert;
import java.util.ArrayList;
@@ -57,35 +56,41 @@
public void testSingleNonExclusiveDivert() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress = "forwardAddress";
-
- DivertConfiguration divertConf = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress, false, null, null);
-
+
+ DivertConfiguration divertConf = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
session.createQueue(new SimpleString(forwardAddress), queueName1, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName2, null, false);
session.start();
@@ -93,95 +98,101 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer2.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testSingleNonExclusiveDivert2() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress = "forwardAddress";
-
- DivertConfiguration divertConf = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress, false, null, null);
-
+
+ DivertConfiguration divertConf = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress), queueName1, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName2, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
session.start();
@@ -189,190 +200,202 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer2.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer3.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer3.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer4.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer4.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testSingleNonExclusiveDivert3() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress = "forwardAddress";
-
- DivertConfiguration divertConf = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress, false, null, null);
-
+
+ DivertConfiguration divertConf = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
session.createQueue(new SimpleString(forwardAddress), queueName1, null, false);
-
+
session.start();
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testSingleExclusiveDivert() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress = "forwardAddress";
-
- DivertConfiguration divertConf = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress, true, null, null);
-
+
+ DivertConfiguration divertConf = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress,
+ true,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress), queueName1, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName2, null, false);
session.createQueue(new SimpleString(testAddress), queueName3, null, false);
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
@@ -382,100 +405,118 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
assertNull(consumer2.receive(200));
-
+
assertNull(consumer3.receive(200));
-
+
assertNull(consumer4.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testMultipleNonExclusiveDivert() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
-
- DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, false, null, null);
-
- DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, false, null, null);
-
- DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
-
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress1,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2",
+ "divert2",
+ testAddress,
+ forwardAddress2,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3",
+ "divert3",
+ testAddress,
+ forwardAddress3,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf1);
divertConfs.add(divertConf2);
divertConfs.add(divertConf3);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
session.start();
@@ -483,133 +524,151 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer2.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer3.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer3.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer4.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer4.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testMultipleExclusiveDivert() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
-
- DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, true, null, null);
-
- DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, true, null, null);
-
- DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, true, null, null);
-
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress1,
+ true,
+ null,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2",
+ "divert2",
+ testAddress,
+ forwardAddress2,
+ true,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3",
+ "divert3",
+ testAddress,
+ forwardAddress3,
+ true,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf1);
divertConfs.add(divertConf2);
divertConfs.add(divertConf3);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
session.start();
@@ -617,122 +676,140 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer2.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer3.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer3.receive(200));
-
- assertNull(consumer4.receive(200));
-
+
+ assertNull(consumer4.receive(200));
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testMixExclusiveAndNonExclusiveDiverts() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
-
- DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, true, null, null);
-
- DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, true, null, null);
-
- DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
-
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress1,
+ true,
+ null,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2",
+ "divert2",
+ testAddress,
+ forwardAddress2,
+ true,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3",
+ "divert3",
+ testAddress,
+ forwardAddress3,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf1);
divertConfs.add(divertConf2);
divertConfs.add(divertConf3);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
session.start();
@@ -740,327 +817,550 @@
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer2.receive(200));
-
+
assertNull(consumer3.receive(200));
-
- assertNull(consumer4.receive(200));
-
+
+ assertNull(consumer4.receive(200));
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
+ // If no exclusive diverts match then non exclusive ones should be called
+ public void testSingleExclusiveNonMatchingAndNonExclusiveDiverts() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ conf.setClustered(true);
+
+ final String testAddress = "testAddress";
+
+ final String forwardAddress1 = "forwardAddress1";
+ final String forwardAddress2 = "forwardAddress2";
+ final String forwardAddress3 = "forwardAddress3";
+
+ final String filter = "animal='antelope'";
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "divert1",
+ testAddress,
+ forwardAddress1,
+ true,
+ filter,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2",
+ "divert2",
+ testAddress,
+ forwardAddress2,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3",
+ "divert3",
+ testAddress,
+ forwardAddress3,
+ false,
+ null,
+ null);
+
+ List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
+
+ divertConfs.add(divertConf1);
+ divertConfs.add(divertConf2);
+ divertConfs.add(divertConf3);
+
+ conf.setDivertConfigurations(divertConfs);
+
+ MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ final SimpleString queueName1 = new SimpleString("queue1");
+
+ final SimpleString queueName2 = new SimpleString("queue2");
+
+ final SimpleString queueName3 = new SimpleString("queue3");
+
+ final SimpleString queueName4 = new SimpleString("queue4");
+
+ session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
+
+ session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
+
+ session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
+
+ session.createQueue(new SimpleString(testAddress), queueName4, null, false);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ ClientConsumer consumer3 = session.createConsumer(queueName3);
+
+ ClientConsumer consumer4 = session.createConsumer(queueName4);
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(new SimpleString("animal"), new SimpleString("giraffe"));
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ // for (int i = 0; i < numMessages; i++)
+ // {
+ // ClientMessage message = consumer1.receive(200);
+ //
+ // assertNotNull(message);
+ //
+ // assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ //
+ // message.acknowledge();
+ // }
+
+ assertNull(consumer1.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer2.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer2.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer3.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer3.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer4.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer4.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putStringProperty(new SimpleString("animal"), new SimpleString("antelope"));
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ assertNull(consumer2.receive(200));
+
+ assertNull(consumer3.receive(200));
+
+ assertNull(consumer4.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
public void testRoundRobinDiverts() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
-
- DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "thename", testAddress, forwardAddress1, false, null, null);
-
- DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "thename", testAddress, forwardAddress2, false, null, null);
-
- DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "thename", testAddress, forwardAddress3, false, null, null);
-
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "thename",
+ testAddress,
+ forwardAddress1,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2",
+ "thename",
+ testAddress,
+ forwardAddress2,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3",
+ "thename",
+ testAddress,
+ forwardAddress3,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf1);
divertConfs.add(divertConf2);
divertConfs.add(divertConf3);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
-
+
session.start();
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
- for (int i = 0; i < numMessages; )
- {
+
+ for (int i = 0; i < numMessages;)
+ {
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
-
+
i++;
-
+
if (i == numMessages)
{
break;
}
-
+
message = consumer2.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
-
+
i++;
-
+
if (i == numMessages)
{
break;
}
-
+
message = consumer3.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
-
- i++;
+
+ i++;
}
-
+
assertNull(consumer1.receive(200));
assertNull(consumer2.receive(200));
- assertNull(consumer3.receive(200));
-
+ assertNull(consumer3.receive(200));
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer4.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer4.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
-
+
public void testDeployDivertsSameUniqueName() throws Exception
{
Configuration conf = createDefaultConfig();
-
+
conf.setClustered(true);
-
+
final String testAddress = "testAddress";
-
+
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
-
- DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "thename1", testAddress, forwardAddress1, false, null, null);
-
- DivertConfiguration divertConf2 = new DivertConfiguration("divert1", "thename2", testAddress, forwardAddress2, false, null, null);
-
- DivertConfiguration divertConf3 = new DivertConfiguration("divert2", "thename3", testAddress, forwardAddress3, false, null, null);
-
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1",
+ "thename1",
+ testAddress,
+ forwardAddress1,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert1",
+ "thename2",
+ testAddress,
+ forwardAddress2,
+ false,
+ null,
+ null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert2",
+ "thename3",
+ testAddress,
+ forwardAddress3,
+ false,
+ null,
+ null);
+
List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-
+
divertConfs.add(divertConf1);
divertConfs.add(divertConf2);
divertConfs.add(divertConf3);
-
+
conf.setDivertConfigurations(divertConfs);
-
+
MessagingServer messagingService = Messaging.newMessagingServer(conf, false);
-
+
messagingService.start();
-
- //Only the first and third should be deployed
-
+
+ // Only the first and third should be deployed
+
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
ClientSession session = sf.createSession(false, true, true);
-
+
final SimpleString queueName1 = new SimpleString("queue1");
-
+
final SimpleString queueName2 = new SimpleString("queue2");
-
+
final SimpleString queueName3 = new SimpleString("queue3");
-
+
final SimpleString queueName4 = new SimpleString("queue4");
-
+
session.createQueue(new SimpleString(forwardAddress1), queueName1, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress2), queueName2, null, false);
-
+
session.createQueue(new SimpleString(forwardAddress3), queueName3, null, false);
-
+
session.createQueue(new SimpleString(testAddress), queueName4, null, false);
-
+
session.start();
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session.createConsumer(queueName1);
-
+
ClientConsumer consumer2 = session.createConsumer(queueName2);
-
+
ClientConsumer consumer3 = session.createConsumer(queueName3);
-
+
ClientConsumer consumer4 = session.createConsumer(queueName4);
-
+
final int numMessages = 10;
-
+
final SimpleString propKey = new SimpleString("testkey");
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createClientMessage(false);
-
+
message.putIntProperty(propKey, i);
-
+
producer.send(message);
}
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer1.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer1.receive(200));
-
+
assertNull(consumer2.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer3.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer3.receive(200));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer4.receive(200);
-
+
assertNotNull(message);
-
+
assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-
+
message.acknowledge();
}
-
+
assertNull(consumer4.receive(200));
-
+
session.close();
-
+
sf.close();
-
+
messagingService.stop();
}
More information about the jboss-cvs-commits
mailing list