[hornetq-commits] JBoss hornetq SVN: r8426 - in trunk: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 27 05:28:40 EST 2009


Author: jmesnil
Date: 2009-11-27 05:28:39 -0500 (Fri, 27 Nov 2009)
New Revision: 8426

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Modified:
   trunk/build-hornetq.xml
Log:
readded cluster with backup tests but excluded them from integratio-tests ant target

Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml	2009-11-27 10:15:37 UTC (rev 8425)
+++ trunk/build-hornetq.xml	2009-11-27 10:28:39 UTC (rev 8426)
@@ -1220,6 +1220,8 @@
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${test.classes.dir}">
                <!-- <exclude name="**/integration/http/*" /> -->
+            	<!-- cluster tests with backup are brittled and are temporarily excluded -->
+               <exclude name="**/cluster/distribution/*WithBackupTest" />
                <include name="${tests.param}"/>
             </fileset>
          </batchtest>

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java	2009-11-27 10:28:39 UTC (rev 8426)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * 
+ * A ClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public class ClusterWithBackupTest extends ClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(ClusterWithBackupTest.class);
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      setupServers();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers();
+
+      super.tearDown();
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+   
+   public void testBasicRoundRobin() throws Exception
+   {
+      setupCluster();
+
+      startServers(0, 1, 2, 3, 4, 5);
+      
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+      setupSessionFactory(5, isNetty());
+      
+      createQueue(3, "queues.testaddress", "queue0", null, false);
+      createQueue(4, "queues.testaddress", "queue0", null, false);
+      createQueue(5, "queues.testaddress", "queue0", null, false);
+      
+      addConsumer(0, 3, "queue0", null);
+      addConsumer(1, 4, "queue0", null);
+      addConsumer(2, 5, "queue0", null);
+      
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+      waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(3, "queues.testaddress", 2, 2, false);
+      waitForBindings(4, "queues.testaddress", 2, 2, false);
+      waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+      send(3, "queues.testaddress", 100, false, null);
+
+      verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+      verifyNotReceive(0, 0, 1, 2);
+   }
+   
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+      
+      
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+   }
+
+   protected void setupServers() throws Exception
+   {
+      //The backups
+      setupServer(0, isFileStorage(), isNetty(), true);
+      setupServer(1, isFileStorage(), isNetty(), true);
+      setupServer(2, isFileStorage(), isNetty(), true);
+      
+      //The lives
+      setupServer(3, isFileStorage(), isNetty(), 0);
+      setupServer(4, isFileStorage(), isNetty(), 1);
+      setupServer(5, isFileStorage(), isNetty(), 2);      
+      
+   }
+
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4, 5);
+   }
+
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	2009-11-27 10:28:39 UTC (rev 8426)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettyFileStorageSymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageSymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+  
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	2009-11-27 10:28:39 UTC (rev 8426)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-11-27 10:28:39 UTC (rev 8426)
@@ -0,0 +1,603 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 13 Mar 2009 11:00:31
+ *
+ *
+ */
+public class SymmetricClusterWithBackupTest extends SymmetricClusterTest
+{
+   private static final Logger log = Logger.getLogger(SymmetricClusterWithBackupTest.class);
+
+   public void testStopAllStartAll() throws Throwable
+   {
+      try
+      {
+         setupCluster();
+   
+         startServers();
+   
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+         setupSessionFactory(3, isNetty());
+         setupSessionFactory(4, isNetty());
+   
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+         createQueue(3, "queues.testaddress", "queue0", null, false);
+         createQueue(4, "queues.testaddress", "queue0", null, false);
+   
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+         addConsumer(3, 3, "queue0", null);
+         addConsumer(4, 4, "queue0", null);
+   
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
+   
+         waitForBindings(0, "queues.testaddress", 4, 4, false);
+         waitForBindings(1, "queues.testaddress", 4, 4, false);
+         waitForBindings(2, "queues.testaddress", 4, 4, false);
+         waitForBindings(3, "queues.testaddress", 4, 4, false);
+         waitForBindings(4, "queues.testaddress", 4, 4, false);
+   
+         System.out.println("waited for all bindings");
+
+         send(0, "queues.testaddress", 10, false, null);
+   
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+   
+         verifyNotReceive(0, 1, 2, 3, 4);
+   
+         closeAllConsumers();
+         
+         closeAllSessionFactories();
+
+         stopServers();
+   
+         startServers();
+   
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+         setupSessionFactory(3, isNetty());
+         setupSessionFactory(4, isNetty());
+   
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+         createQueue(3, "queues.testaddress", "queue0", null, false);
+         createQueue(4, "queues.testaddress", "queue0", null, false);
+   
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+         addConsumer(3, 3, "queue0", null);
+         addConsumer(4, 4, "queue0", null);
+   
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
+   
+         waitForBindings(0, "queues.testaddress", 4, 4, false);
+         waitForBindings(1, "queues.testaddress", 4, 4, false);
+         waitForBindings(2, "queues.testaddress", 4, 4, false);
+         waitForBindings(3, "queues.testaddress", 4, 4, false);
+         waitForBindings(4, "queues.testaddress", 4, 4, false);
+   
+         send(0, "queues.testaddress", 10, false, null);
+   
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+   
+         this.verifyNotReceive(0, 1, 2, 3, 4);
+         
+         
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+      }
+      catch (Throwable e)
+      {
+         System.out.println(threadDump("SymmetricClusterWithBackupTest::testStopAllStartAll"));
+         throw e;
+      }
+   }
+
+   @Override
+   public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforeAllServersAreStarted() throws Exception
+   {
+      setupCluster();
+
+      startServers(5, 0);
+
+      setupSessionFactory(0, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(5, 0, "queue5", null);
+
+      startServers(6, 1);
+      setupSessionFactory(1, isNetty());
+
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(11, 1, "queue11", null);
+      addConsumer(16, 1, "queue15", null);
+
+      startServers(7, 2);
+      setupSessionFactory(2, isNetty());
+
+      createQueue(2, "queues.testaddress", "queue2", null, false);
+      createQueue(2, "queues.testaddress", "queue7", null, false);
+      createQueue(2, "queues.testaddress", "queue12", null, false);
+      createQueue(2, "queues.testaddress", "queue15", null, false);
+      createQueue(2, "queues.testaddress", "queue16", null, false);
+
+      addConsumer(2, 2, "queue2", null);
+      addConsumer(7, 2, "queue7", null);
+      addConsumer(12, 2, "queue12", null);
+      addConsumer(17, 2, "queue15", null);
+
+      startServers(8, 3);
+      setupSessionFactory(3, isNetty());
+
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(3, 3, "queue3", null);
+      addConsumer(8, 3, "queue8", null);
+      addConsumer(13, 3, "queue13", null);
+      addConsumer(18, 3, "queue15", null);
+
+      startServers(9, 4);
+      setupSessionFactory(4, isNetty());
+
+      createQueue(4, "queues.testaddress", "queue4", null, false);
+      createQueue(4, "queues.testaddress", "queue9", null, false);
+      createQueue(4, "queues.testaddress", "queue14", null, false);
+      createQueue(4, "queues.testaddress", "queue15", null, false);
+      createQueue(4, "queues.testaddress", "queue16", null, false);
+      createQueue(4, "queues.testaddress", "queue17", null, false);
+      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(4, 4, "queue4", null);
+      addConsumer(9, 4, "queue9", null);
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(19, 4, "queue15", null);
+
+      addConsumer(20, 2, "queue16", null);
+      addConsumer(21, 3, "queue16", null);
+      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+      addConsumer(25, 4, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+      addConsumer(27, 4, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+      
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+   }
+   
+   @Override
+   public void testStartStopServers() throws Exception
+   {
+      setupCluster();
+
+      startServers();
+
+      log.info("setup session factories: ");
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      createQueue(2, "queues.testaddress", "queue2", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+      createQueue(4, "queues.testaddress", "queue4", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(2, "queues.testaddress", "queue7", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+      createQueue(4, "queues.testaddress", "queue9", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      createQueue(2, "queues.testaddress", "queue12", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+      createQueue(4, "queues.testaddress", "queue14", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+      createQueue(2, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+      createQueue(4, "queues.testaddress", "queue15", null, false);
+
+      createQueue(2, "queues.testaddress", "queue16", null, false);
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+      createQueue(4, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+      createQueue(4, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(2, 2, "queue2", null);
+      addConsumer(3, 3, "queue3", null);
+      addConsumer(4, 4, "queue4", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 2, "queue7", null);
+      addConsumer(8, 3, "queue8", null);
+      addConsumer(9, 4, "queue9", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(11, 1, "queue11", null);
+      addConsumer(12, 2, "queue12", null);
+      addConsumer(13, 3, "queue13", null);
+      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(16, 1, "queue15", null);
+      addConsumer(17, 2, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+      addConsumer(19, 4, "queue15", null);
+
+      addConsumer(20, 2, "queue16", null);
+      addConsumer(21, 3, "queue16", null);
+      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+      addConsumer(25, 4, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+      addConsumer(27, 4, "queue18", null);
+
+      log.info("wait for bindings...");
+      
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      log.info("send and receive messages");
+      
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+      removeConsumer(0);
+      removeConsumer(5);
+      removeConsumer(10);
+      removeConsumer(15);
+      removeConsumer(23);
+      removeConsumer(3);
+      removeConsumer(8);
+      removeConsumer(13);
+      removeConsumer(18);
+      removeConsumer(21);
+      removeConsumer(26);
+
+      closeSessionFactory(0);
+      closeSessionFactory(3);
+
+      stopServers(0, 3, 5, 8);
+
+      startServers(5, 8, 0, 3);
+
+      Thread.sleep(2000);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(3, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(3, 3, "queue3", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(8, 3, "queue8", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(13, 3, "queue13", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+
+      addConsumer(21, 3, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);      
+
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+   }
+
+   @Override
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      // The lives
+      setupClusterConnectionWithBackups("cluster0",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        0,
+                                        new int[] { 1, 2, 3, 4 },
+                                        new int[] { 6, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster1",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        1,
+                                        new int[] { 0, 2, 3, 4 },
+                                        new int[] { 5, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster2",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        2,
+                                        new int[] { 0, 1, 3, 4 },
+                                        new int[] { 5, 6, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster3",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        3,
+                                        new int[] { 0, 1, 2, 4 },
+                                        new int[] { 5, 6, 7, 9 });
+
+      setupClusterConnectionWithBackups("cluster4",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        4,
+                                        new int[] { 0, 1, 2, 3 },
+                                        new int[] { 5, 6, 7, 8 });
+
+      // The backups
+
+      setupClusterConnectionWithBackups("cluster0",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        5,
+                                        new int[] { 1, 2, 3, 4 },
+                                        new int[] { 6, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster1",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        6,
+                                      new int[] { 0, 2, 3, 4 },
+                                      new int[] { 5, 7, 8, 9 });
+      
+      setupClusterConnectionWithBackups("cluster2",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        7,
+                                        new int[] { 0, 1, 3, 4 },
+                                        new int[] { 5, 6, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster3",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        8,
+                                        new int[] { 0, 1, 2, 4 },
+                                        new int[] { 5, 6, 7, 9 });
+
+      setupClusterConnectionWithBackups("cluster4",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        9,
+                                        new int[] { 0, 1, 2, 3 },
+                                        new int[] { 5, 6, 7, 8 });
+   }
+
+   @Override
+   protected void setupServers() throws Exception
+   {
+      // The backups
+      setupServer(5, isFileStorage(), isNetty(), true);
+      setupServer(6, isFileStorage(), isNetty(), true);
+      setupServer(7, isFileStorage(), isNetty(), true);
+      setupServer(8, isFileStorage(), isNetty(), true);
+      setupServer(9, isFileStorage(), isNetty(), true);
+
+      // The lives
+      setupServer(0, isFileStorage(), isNetty(), 5);
+      setupServer(1, isFileStorage(), isNetty(), 6);
+      setupServer(2, isFileStorage(), isNetty(), 7);
+      setupServer(3, isFileStorage(), isNetty(), 8);
+      setupServer(4, isFileStorage(), isNetty(), 9);
+   }
+
+   @Override
+   protected void startServers() throws Exception
+   {
+      // Need to set backup, since when restarting backup after it has failed over, backup will have been set to false
+
+      getServer(5).getConfiguration().setBackup(true);
+      getServer(6).getConfiguration().setBackup(true);
+      getServer(7).getConfiguration().setBackup(true);
+      getServer(8).getConfiguration().setBackup(true);
+      getServer(9).getConfiguration().setBackup(true);
+
+      startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+   }
+
+   @Override
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+   }
+
+}



More information about the hornetq-commits mailing list