Author: jmesnil
Date: 2009-11-27 05:28:39 -0500 (Fri, 27 Nov 2009)
New Revision: 8426
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Modified:
trunk/build-hornetq.xml
Log:
readded cluster with backup tests but excluded them from integratio-tests ant target
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-11-27 10:15:37 UTC (rev 8425)
+++ trunk/build-hornetq.xml 2009-11-27 10:28:39 UTC (rev 8426)
@@ -1220,6 +1220,8 @@
<formatter type="plain"
usefile="${junit.formatter.usefile}"/>
<fileset dir="${test.classes.dir}">
<!-- <exclude name="**/integration/http/*" /> -->
+ <!-- cluster tests with backup are brittled and are temporarily excluded
-->
+ <exclude name="**/cluster/distribution/*WithBackupTest"
/>
<include name="${tests.param}"/>
</fileset>
</batchtest>
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2009-11-27
10:28:39 UTC (rev 8426)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ *
+ * A ClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public class ClusterWithBackupTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterWithBackupTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers();
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4, 5);
+
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+ setupSessionFactory(5, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+ createQueue(5, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 3, "queue0", null);
+ addConsumer(1, 4, "queue0", null);
+ addConsumer(2, 5, "queue0", null);
+
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+ waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+ send(3, "queues.testaddress", 100, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+ verifyNotReceive(0, 0, 1, 2);
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+
+ setupClusterConnection("cluster1", "queues",
forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+
+ setupClusterConnection("cluster2", "queues",
forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+
+
+ setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+
+ setupClusterConnection("cluster1", "queues",
forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+
+ setupClusterConnection("cluster2", "queues",
forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ //The backups
+ setupServer(0, isFileStorage(), isNetty(), true);
+ setupServer(1, isFileStorage(), isNetty(), true);
+ setupServer(2, isFileStorage(), isNetty(), true);
+
+ //The lives
+ setupServer(3, isFileStorage(), isNetty(), 0);
+ setupServer(4, isFileStorage(), isNetty(), 1);
+ setupServer(5, isFileStorage(), isNetty(), 2);
+
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4, 5);
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2009-11-27
10:28:39 UTC (rev 8426)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettyFileStorageSymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageSymmetricClusterWithBackupTest extends
SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-11-27
10:28:39 UTC (rev 8426)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-11-27
10:28:39 UTC (rev 8426)
@@ -0,0 +1,603 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 13 Mar 2009 11:00:31
+ *
+ *
+ */
+public class SymmetricClusterWithBackupTest extends SymmetricClusterTest
+{
+ private static final Logger log =
Logger.getLogger(SymmetricClusterWithBackupTest.class);
+
+ public void testStopAllStartAll() throws Throwable
+ {
+ try
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+ createQueue(3, "queues.testaddress", "queue0", null,
false);
+ createQueue(4, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ System.out.println("waited for all bindings");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+ createQueue(3, "queues.testaddress", "queue0", null,
false);
+ createQueue(4, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ this.verifyNotReceive(0, 1, 2, 3, 4);
+
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+ catch (Throwable e)
+ {
+
System.out.println(threadDump("SymmetricClusterWithBackupTest::testStopAllStartAll"));
+ throw e;
+ }
+ }
+
+ @Override
+ public void
testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforeAllServersAreStarted()
throws Exception
+ {
+ setupCluster();
+
+ startServers(5, 0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(5, 0, "queue5", null);
+
+ startServers(6, 1);
+ setupSessionFactory(1, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(16, 1, "queue15", null);
+
+ startServers(7, 2);
+ setupSessionFactory(2, isNetty());
+
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(17, 2, "queue15", null);
+
+ startServers(8, 3);
+ setupSessionFactory(3, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(18, 3, "queue15", null);
+
+ startServers(9, 4);
+ setupSessionFactory(4, isNetty());
+
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(4, 4, "queue4", null);
+ addConsumer(9, 4, "queue9", null);
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ public void testStartStopServers() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ log.info("setup session factories: ");
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ log.info("wait for bindings...");
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ log.info("send and receive messages");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(0);
+ closeSessionFactory(3);
+
+ stopServers(0, 3, 5, 8);
+
+ startServers(5, 8, 0, 3);
+
+ Thread.sleep(2000);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+
+ addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ // The lives
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 0,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 1,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 2,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 3,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 4,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+
+ // The backups
+
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 5,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 6,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 7,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 8,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 9,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ // The backups
+ setupServer(5, isFileStorage(), isNetty(), true);
+ setupServer(6, isFileStorage(), isNetty(), true);
+ setupServer(7, isFileStorage(), isNetty(), true);
+ setupServer(8, isFileStorage(), isNetty(), true);
+ setupServer(9, isFileStorage(), isNetty(), true);
+
+ // The lives
+ setupServer(0, isFileStorage(), isNetty(), 5);
+ setupServer(1, isFileStorage(), isNetty(), 6);
+ setupServer(2, isFileStorage(), isNetty(), 7);
+ setupServer(3, isFileStorage(), isNetty(), 8);
+ setupServer(4, isFileStorage(), isNetty(), 9);
+ }
+
+ @Override
+ protected void startServers() throws Exception
+ {
+ // Need to set backup, since when restarting backup after it has failed over,
backup will have been set to false
+
+ getServer(5).getConfiguration().setBackup(true);
+ getServer(6).getConfiguration().setBackup(true);
+ getServer(7).getConfiguration().setBackup(true);
+ getServer(8).getConfiguration().setBackup(true);
+ getServer(9).getConfiguration().setBackup(true);
+
+ startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+ }
+
+ @Override
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+
+}