[jboss-cvs] JBoss Messaging SVN: r5838 - in trunk: src/main/org/jboss/messaging/core/message/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Feb 7 11:44:13 EST 2009
Author: timfox
Date: 2009-02-07 11:44:12 -0500 (Sat, 07 Feb 2009)
New Revision: 5838
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OneWayChainClusterTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/message/Message.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
more clustering tests etc
Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -95,6 +95,8 @@
// Properties
// ------------------------------------------------------------------
+ TypedProperties getProperties();
+
void putBooleanProperty(SimpleString key, boolean value);
void putByteProperty(SimpleString key, byte value);
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -379,6 +379,11 @@
return properties.getPropertyNames();
}
+ public TypedProperties getProperties()
+ {
+ return this.properties;
+ }
+
// Body
// -------------------------------------------------------------------------------------
@@ -398,11 +403,6 @@
// Protected -----------------------------------------------------
- protected TypedProperties getProperties()
- {
- return this.properties;
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -37,8 +37,9 @@
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -74,6 +75,8 @@
private final StorageManager storageManager;
private final PostOffice postOffice;
+
+ private final ManagementService managementService;
private final SimpleString name;
@@ -119,6 +122,7 @@
final ExecutorFactory executorFactory,
final StorageManager storageManager,
final PostOffice postOffice,
+ final ManagementService managementService,
final ScheduledExecutorService scheduledExecutor,
final QueueFactory queueFactory,
final List<Pair<TransportConfiguration, TransportConfiguration>> connectors,
@@ -146,6 +150,8 @@
this.storageManager = storageManager;
this.postOffice = postOffice;
+
+ this.managementService = managementService;
this.discoveryGroup = null;
@@ -174,6 +180,7 @@
final ExecutorFactory executorFactory,
final StorageManager storageManager,
final PostOffice postOffice,
+ final ManagementService managementService,
final ScheduledExecutorService scheduledExecutor,
final QueueFactory queueFactory,
final DiscoveryGroup discoveryGroup,
@@ -197,6 +204,8 @@
this.storageManager = storageManager;
this.postOffice = postOffice;
+
+ this.managementService = managementService;
this.scheduledExecutor = scheduledExecutor;
@@ -526,6 +535,18 @@
bindings.put(clusterName, binding);
+ if (postOffice.getBinding(clusterName) != null)
+ {
+ //Sanity check - this means the binding has already been added via another bridge, probably max hops is too high
+ //or there are multiple cluster connections for the same address
+
+ log.warn("Remoting queue binding " + clusterName + " has already been bound in the post office. Most likely cause for this is you have a loop " +
+ "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+
+ return;
+ }
+
+
postOffice.addBinding(binding);
Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
@@ -574,6 +595,11 @@
binding.addConsumer(filterString);
+ //Need to propagate the consumer add
+ Notification notification = new Notification(ntype, message.getProperties());
+
+ managementService.sendNotification(notification);
+
break;
}
case NotificationType.CONSUMER_CLOSED_INDEX:
@@ -596,6 +622,11 @@
binding.removeConsumer(filterString);
+ //Need to propagate the consumer close
+ Notification notification = new Notification(ntype, message.getProperties());
+
+ managementService.sendNotification(notification);
+
break;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -477,6 +477,7 @@
executorFactory,
storageManager,
postOffice,
+ managementService,
scheduledExecutor,
queueFactory,
connectors,
@@ -504,6 +505,7 @@
executorFactory,
storageManager,
postOffice,
+ managementService,
scheduledExecutor,
queueFactory,
dg,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -160,7 +160,7 @@
}
}
- // log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
+ // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
@@ -417,42 +417,48 @@
protected void verifyReceiveRoundRobinInSomeOrder(int numMessages, int... consumerIDs) throws Exception
{
Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
-
+
Set<Integer> counts = new HashSet<Integer>();
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < consumerIDs.length; i++)
{
- for (int j = 0; j < consumerIDs.length; j++)
+ ConsumerHolder holder = consumers[consumerIDs[i]];
+
+ if (holder == null)
{
- ConsumerHolder holder = consumers[consumerIDs[j]];
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
- if (holder == null)
+ ClientMessage message;
+ do
+ {
+ message = holder.consumer.receive(200);
+
+ if (message != null)
{
- throw new IllegalArgumentException("No consumer at " + consumerIDs[j]);
- }
+ int count = (Integer)message.getProperty(COUNT_PROP);
- ClientMessage message = holder.consumer.receive(500);
+ Integer prevCount = countMap.get(i);
- assertNotNull("consumer " + consumerIDs[j] + " did not receive message", message);
+ if (prevCount != null)
+ {
+ assertTrue(count == prevCount + consumerIDs.length);
+ }
- int count = (Integer)message.getProperty(COUNT_PROP);
+ assertFalse(counts.contains(count));
- Integer prevCount = countMap.get(j);
+ counts.add(count);
- if (prevCount != null)
- {
- assertTrue(count == prevCount + consumerIDs.length);
+ countMap.put(i, count);
}
-
- assertFalse(counts.contains(count));
-
- counts.add(count);
-
- countMap.put(j, count);
-
- i++;
}
+ while (message != null);
}
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ assertTrue(counts.contains(i));
+ }
}
protected void verifyNotReceive(int... consumerIDs) throws Exception
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OneWayChainClusterTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A OneWayChainClusterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 7 Feb 2009 15:23:08
+ *
+ *
+ */
+public class OneWayChainClusterTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(OneWayChainClusterTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4);
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ setupClusterConnection("cluster0-1", 0, 1, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+
+ addConsumer(1, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+ }
+
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -375,6 +375,89 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesRemoveSomeQueuesAndConsumers() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+ startServers(1, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue10", null, false);
+
+ createQueue(0, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+
+ createQueue(0, "queues.testaddress", "queue12", null, false);
+ createQueue(1, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue10", null);
+
+ addConsumer(12, 0, "queue11", null);
+ addConsumer(13, 1, "queue11", null);
+
+ addConsumer(14, 0, "queue12", null);
+ addConsumer(15, 1, "queue12", null);
+
+ waitForBindings(0, "queues.testaddress", 8, 8, true);
+ waitForBindings(0, "queues.testaddress", 8, 8, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ verifyReceiveRoundRobin(10, 10, 11);
+ verifyReceiveRoundRobin(10, 12, 13);
+ verifyReceiveRoundRobin(10, 14, 15);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+
+ removeConsumer(10);
+ removeConsumer(13);
+ removeConsumer(14);
+
+ deleteQueue(0, "queue10");
+ deleteQueue(1, "queue11");
+ deleteQueue(0, "queue12");
+
+ waitForBindings(0, "queues.testaddress", 6, 6, true);
+ waitForBindings(0, "queues.testaddress", 7, 7, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 15);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 15);
+ }
+
public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnTargetBeforeStartSource() throws Exception
{
setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
@@ -441,6 +524,8 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}
+
+
public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnSourceBeforeStartTarget() throws Exception
{
setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
@@ -601,6 +686,10 @@
verifyReceiveAll(10, 1, 3, 5, 7, 9, 10, 11);
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveAll(10, 10, 11);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
}
public void testRoundRobinMultipleQueuesWithFilters() throws Exception
@@ -663,6 +752,9 @@
verifyReceiveAll(10, 3, 4);
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobin(10, 8, 9);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}
public void testRouteWhenNoConsumersFalseNonBalancedQueues() throws Exception
@@ -1070,6 +1162,10 @@
verifyReceiveAll(10, 3, 4);
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobin(10, 8, 9);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
}
public void testMultipleClusterConnections() throws Exception
@@ -1143,87 +1239,5 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
}
-
-//
-// public void testDurableAndRestart()
-// {
-// }
-//
-// public void testWithNetty()
-// {
-// }
-// public void testNetty() throws Exception
-// {
-// //this.stopServers(0, 1);
-//
-// super.clearServer(0, 1);
-//
-// this.setupServer(0, true, true);
-//
-// //setupClusterConnection("cluster1", 0, 1, "queues", false, true);
-// startServers(0);
-//
-// log.info("started servers");
-//
-// setupSessionFactory(0, true);
-//
-//
-// createQueue(0, "queues.testaddress", "queue0", null, false);
-// //createQueue(1, "queues.testaddress", "queue0", null, false);
-//
-// addConsumer(0, 0, "queue0", null);
-//
-//
-// waitForBindings(0, "queues.testaddress", 1, 1, true);
-//
-// send(0, "queues.testaddress", 10, false, null);
-//
-// verifyReceiveAll(0);
-// }
-//
-// public void testRoundRobinMultipleQueuesNetty() throws Exception
-// {
-//
-//
-// //setupClusterConnection("cluster1", 0, 1, "queues", false, true);
-// startServers(1, 0);
-//
-// log.info("started servers");
-//
-// setupSessionFactory(0, true);
-// setupSessionFactory(1, true);
-//
-// createQueue(0, "queues.testaddress", "queue0", null, false);
-// createQueue(1, "queues.testaddress", "queue0", null, false);
-//
-// createQueue(0, "queues.testaddress", "queue1", null, false);
-// createQueue(1, "queues.testaddress", "queue1", null, false);
-//
-// createQueue(0, "queues.testaddress", "queue2", null, false);
-// createQueue(1, "queues.testaddress", "queue2", null, false);
-//
-// addConsumer(0, 0, "queue0", null);
-// addConsumer(1, 1, "queue0", null);
-//
-// addConsumer(2, 0, "queue1", null);
-// addConsumer(3, 1, "queue1", null);
-//
-// addConsumer(4, 0, "queue2", null);
-// addConsumer(5, 1, "queue2", null);
-//
-// waitForBindings(0, "queues.testaddress", 3, 3, true);
-// waitForBindings(0, "queues.testaddress", 3, 3, false);
-//
-// send(0, "queues.testaddress", 10, false, null);
-//
-// verifyReceiveRoundRobin(10, 0, 1);
-//
-// verifyReceiveRoundRobin(10, 2, 3);
-//
-// verifyReceiveRoundRobin(10, 4, 5);
-//
-// verifyNotReceive(0, 1, 2, 3, 4, 5);
-// }
-
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-02-07 10:21:43 UTC (rev 5837)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-02-07 16:44:12 UTC (rev 5838)
@@ -20,13 +20,14 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.integration.cluster.distribution;
import org.jboss.messaging.core.logging.Logger;
/**
* A SymmetricClusterTest
+ *
+ * Most of the cases are covered in OneWayTwoNodeClusterTest - we don't duplicate them all here
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -42,102 +43,1452 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
setupServer(0, isFileStorage(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty());
- setupServer(3, isFileStorage(), isNetty());
- setupServer(4, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
}
@Override
protected void tearDown() throws Exception
{
closeAllConsumers();
-
+
closeAllSessionFactories();
-
+
stopServers(0, 1, 2, 3, 4);
-
+
super.tearDown();
}
-
+
protected boolean isNetty()
{
return false;
}
-
+
protected boolean isFileStorage()
{
return false;
}
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
- public void testRoundRobin() throws Exception
+
+ public void testRoundRobinMultipleQueues() throws Exception
{
- setupClusterConnection("cluster0-1", 0, 1, "queues", false, 1, isNetty());
- setupClusterConnection("cluster0-2", 0, 2, "queues", false, 1, isNetty());
- setupClusterConnection("cluster0-3", 0, 3, "queues", false, 1, isNetty());
- setupClusterConnection("cluster0-4", 0, 4, "queues", false, 1, isNetty());
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue1", null, false);
+ createQueue(3, "queues.testaddress", "queue1", null, false);
+ createQueue(4, "queues.testaddress", "queue1", null, false);
+
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue2", null, false);
+ createQueue(4, "queues.testaddress", "queue2", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ addConsumer(5, 0, "queue1", null);
+ addConsumer(6, 1, "queue1", null);
+ addConsumer(7, 2, "queue1", null);
+ addConsumer(8, 3, "queue1", null);
+ addConsumer(9, 4, "queue1", null);
+
+ addConsumer(10, 0, "queue2", null);
+ addConsumer(11, 1, "queue2", null);
+ addConsumer(12, 2, "queue2", null);
+ addConsumer(13, 3, "queue2", null);
+ addConsumer(14, 4, "queue2", null);
+
+ waitForBindings(0, "queues.testaddress", 3, 3, true);
+ waitForBindings(1, "queues.testaddress", 3, 3, true);
+ waitForBindings(2, "queues.testaddress", 3, 3, true);
+ waitForBindings(3, "queues.testaddress", 3, 3, true);
+ waitForBindings(4, "queues.testaddress", 3, 3, true);
+
+ waitForBindings(0, "queues.testaddress", 12, 12, false);
+ waitForBindings(1, "queues.testaddress", 12, 12, false);
+ waitForBindings(2, "queues.testaddress", 12, 12, false);
+ waitForBindings(3, "queues.testaddress", 12, 12, false);
+ waitForBindings(4, "queues.testaddress", 12, 12, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+ verifyReceiveRoundRobinInSomeOrder(10, 5, 6, 7, 8, 9);
+ verifyReceiveRoundRobinInSomeOrder(10, 10, 11, 12, 13, 14);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ }
+
+ public void testMultipleNonLoadBalancedQueues() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ waitForBindings(0, "queues.testaddress", 3, 3, true);
+ waitForBindings(1, "queues.testaddress", 3, 3, true);
+ waitForBindings(2, "queues.testaddress", 3, 3, true);
+ waitForBindings(3, "queues.testaddress", 3, 3, true);
+ waitForBindings(4, "queues.testaddress", 3, 3, true);
+
+ waitForBindings(0, "queues.testaddress", 12, 12, false);
+ waitForBindings(1, "queues.testaddress", 12, 12, false);
+ waitForBindings(2, "queues.testaddress", 12, 12, false);
+ waitForBindings(3, "queues.testaddress", 12, 12, false);
+ waitForBindings(4, "queues.testaddress", 12, 12, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
- setupClusterConnection("cluster1-0", 1, 0, "queues", false, 1, isNetty());
- setupClusterConnection("cluster1-2", 1, 2, "queues", false, 1, isNetty());
- setupClusterConnection("cluster1-3", 1, 3, "queues", false, 1, isNetty());
- setupClusterConnection("cluster1-4", 1, 4, "queues", false, 1, isNetty());
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
- setupClusterConnection("cluster2-0", 2, 0, "queues", false, 1, isNetty());
- setupClusterConnection("cluster2-1", 2, 1, "queues", false, 1, isNetty());
- setupClusterConnection("cluster2-3", 2, 3, "queues", false, 1, isNetty());
- setupClusterConnection("cluster2-4", 2, 4, "queues", false, 1, isNetty());
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
- setupClusterConnection("cluster3-0", 3, 0, "queues", false, 1, isNetty());
- setupClusterConnection("cluster3-1", 3, 1, "queues", false, 1, isNetty());
- setupClusterConnection("cluster3-2", 3, 2, "queues", false, 1, isNetty());
- setupClusterConnection("cluster3-4", 3, 4, "queues", false, 1, isNetty());
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
- setupClusterConnection("cluster4-0", 4, 0, "queues", false, 1, isNetty());
- setupClusterConnection("cluster4-1", 4, 1, "queues", false, 1, isNetty());
- setupClusterConnection("cluster4-2", 4, 2, "queues", false, 1, isNetty());
- setupClusterConnection("cluster4-3", 4, 3, "queues", false, 1, isNetty());
-
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesRemoveSomeQueuesAndConsumers() throws Exception
+ {
+ setupCluster();
+
startServers(0, 1, 2, 3, 4);
-
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ removeConsumer(16);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(22);
+ removeConsumer(26);
+
+ deleteQueue(1, "queue15");
+ deleteQueue(3, "queue15");
+
+ deleteQueue(3, "queue16");
+ deleteQueue(4, "queue16");
+
+ deleteQueue(3, "queue18");
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 4, 4, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 3, 3, true);
+ waitForBindings(4, "queues.testaddress", 6, 6, true);
+
+ waitForBindings(0, "queues.testaddress", 18, 18, false);
+ waitForBindings(1, "queues.testaddress", 19, 19, false);
+ waitForBindings(2, "queues.testaddress", 18, 18, false);
+ waitForBindings(3, "queues.testaddress", 20, 20, false);
+ waitForBindings(4, "queues.testaddress", 17, 17, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 20, 27);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 17, 19);
+
+ //this.checkReceive(23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 19, 20, 23, 24, 25, 27);
+
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforAllServersAreStarted() throws Exception
+ {
+ setupCluster();
+
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(5, 0, "queue5", null);
+
+ startServers(1);
+ setupSessionFactory(1, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(16, 1, "queue15", null);
+
+ startServers(2);
+ setupSessionFactory(2, isNetty());
+
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(17, 2, "queue15", null);
+
+ startServers(3);
+ setupSessionFactory(3, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(18, 3, "queue15", null);
+
+ startServers(4);
+ setupSessionFactory(4, isNetty());
+
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(4, 4, "queue4", null);
+ addConsumer(9, 4, "queue9", null);
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesWithFilters() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ String filter1 = "haggis";
+ String filter2 = "scotch-egg";
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", filter1, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", filter2, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", filter1, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", filter2, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", filter1, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", filter2, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", filter1, false);
+
+ createQueue(0, "queues.testaddress", "queue15", filter1, false);
+ createQueue(1, "queues.testaddress", "queue15", filter1, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", filter2, false);
+ createQueue(4, "queues.testaddress", "queue15", filter2, false);
+
+ createQueue(2, "queues.testaddress", "queue16", filter1, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", filter2, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveAll(10, 0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 27);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveAll(10, 0, 2, 3, 4, 6, 7, 8, 10, 11, 12, 13);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 12, 13, 17, 27);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesWithConsumersWithFilters() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ String filter1 = "haggis";
+ String filter2 = "scotch-egg";
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", filter2, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", filter1);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", filter2);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", filter1);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", filter2);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", filter1);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", filter2);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", filter1);
+
+ addConsumer(15, 0, "queue15", filter1);
+ addConsumer(16, 1, "queue15", filter1);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", filter2);
+ addConsumer(19, 4, "queue15", filter2);
+
+ addConsumer(20, 2, "queue16", filter1);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", filter2);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveAll(10, 0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 27);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveAll(10, 0, 2, 3, 4, 6, 7, 8, 10, 11, 12, 13);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 12, 13, 17, 27);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+ }
+
+ public void testRouteWhenNoConsumersTrueLoadBalancedQueues() throws Exception
+ {
+ setupCluster(true);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
createQueue(3, "queues.testaddress", "queue0", null, false);
createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+ waitForBindings(4, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 0, false);
+ waitForBindings(1, "queues.testaddress", 4, 0, false);
+ waitForBindings(2, "queues.testaddress", 4, 0, false);
+ waitForBindings(3, "queues.testaddress", 4, 0, false);
+ waitForBindings(4, "queues.testaddress", 4, 0, false);
+
+ send(0, "queues.testaddress", 10, false, null);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
-
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
+ public void testRouteWhenNoConsumersFalseNoLocalConsumerLoadBalancedQueues() throws Exception
+ {
+ setupCluster(false);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+ waitForBindings(4, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 0, false);
+ waitForBindings(1, "queues.testaddress", 4, 0, false);
+ waitForBindings(2, "queues.testaddress", 4, 0, false);
+ waitForBindings(3, "queues.testaddress", 4, 0, false);
+ waitForBindings(4, "queues.testaddress", 4, 0, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ //Should still be round robined since no local consumer
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
+ public void testRouteWhenNoConsumersFalseLocalConsumerLoadBalancedQueues() throws Exception
+ {
+ setupCluster(false);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+ waitForBindings(4, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 0, false);
+ waitForBindings(1, "queues.testaddress", 4, 1, false);
+ waitForBindings(2, "queues.testaddress", 4, 1, false);
+ waitForBindings(3, "queues.testaddress", 4, 1, false);
+ waitForBindings(4, "queues.testaddress", 4, 1, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
-
+
+ verifyReceiveAll(10, 0);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
+ public void testRouteWhenNoConsumersFalseNonLoadBalancedQueues() throws Exception
+ {
+ setupCluster(false);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+ waitForBindings(4, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 0, false);
+ waitForBindings(1, "queues.testaddress", 4, 0, false);
+ waitForBindings(2, "queues.testaddress", 4, 0, false);
+ waitForBindings(3, "queues.testaddress", 4, 0, false);
+ waitForBindings(4, "queues.testaddress", 4, 0, false);
+
send(0, "queues.testaddress", 10, false, null);
- verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
+ public void testRouteWhenNoConsumersTrueNonLoadBalancedQueues() throws Exception
+ {
+ setupCluster(true);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+ waitForBindings(4, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 0, false);
+ waitForBindings(1, "queues.testaddress", 4, 0, false);
+ waitForBindings(2, "queues.testaddress", 4, 0, false);
+ waitForBindings(3, "queues.testaddress", 4, 0, false);
+ waitForBindings(4, "queues.testaddress", 4, 0, false);
+
+ send(0, "queues.testaddress", 10, false, null);
- verifyNotReceive(0, 1, 2, 3, 4);
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
}
+ public void testNoLocalQueueNonLoadBalancedQueues() throws Exception
+ {
+ setupCluster(true);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(1, "queues.testaddress", 3, 3, false);
+ waitForBindings(2, "queues.testaddress", 3, 3, false);
+ waitForBindings(3, "queues.testaddress", 3, 3, false);
+ waitForBindings(4, "queues.testaddress", 3, 3, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 1, 2, 3, 4);
+
+ verifyNotReceive(1, 2, 3, 4);
+ }
+ public void testNoLocalQueueLoadBalancedQueues() throws Exception
+ {
+ setupCluster(true);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue1", null, false);
+ createQueue(3, "queues.testaddress", "queue1", null, false);
+ createQueue(4, "queues.testaddress", "queue1", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue1", null);
+ addConsumer(3, 3, "queue1", null);
+ addConsumer(4, 4, "queue1", null);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(1, "queues.testaddress", 3, 3, false);
+ waitForBindings(2, "queues.testaddress", 3, 3, false);
+ waitForBindings(3, "queues.testaddress", 3, 3, false);
+ waitForBindings(4, "queues.testaddress", 3, 3, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 1, 2, 3, 4);
+
+ verifyNotReceive(1, 2, 3, 4);
+ }
+
+ public void testStartStopServers() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(0);
+ closeSessionFactory(3);
+
+ stopServers(0, 3);
+
+ startServers(3, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+
+ addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
+ }
+
+
+
+ private void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ private void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0-1", 0, 1, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster0-2", 0, 2, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster0-3", 0, 3, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster0-4", 0, 4, "queues", forwardWhenNoConsumers, 2, isNetty());
+
+ setupClusterConnection("cluster1-0", 1, 0, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster1-2", 1, 2, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster1-3", 1, 3, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster1-4", 1, 4, "queues", forwardWhenNoConsumers, 2, isNetty());
+
+ setupClusterConnection("cluster2-0", 2, 0, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster2-1", 2, 1, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster2-3", 2, 3, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster2-4", 2, 4, "queues", forwardWhenNoConsumers, 2, isNetty());
+
+ setupClusterConnection("cluster3-0", 3, 0, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster3-1", 3, 1, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster3-2", 3, 2, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster3-4", 3, 4, "queues", forwardWhenNoConsumers, 2, isNetty());
+
+ setupClusterConnection("cluster4-0", 4, 0, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster4-1", 4, 1, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster4-2", 4, 2, "queues", forwardWhenNoConsumers, 2, isNetty());
+ setupClusterConnection("cluster4-3", 4, 3, "queues", forwardWhenNoConsumers, 2, isNetty());
+ }
+
}
More information about the jboss-cvs-commits
mailing list