[hornetq-commits] JBoss hornetq SVN: r8412 - in trunk: src/main/org/hornetq/core/management/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 26 08:19:27 EST 2009


Author: jmesnil
Date: 2009-11-26 08:19:27 -0500 (Thu, 26 Nov 2009)
New Revision: 8412

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Modified:
   trunk/src/main/org/hornetq/core/management/AcceptorControl.java
   trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
re-added cluster with backup tests

* modified RemotingService.stop() sequence to ensure there are no more connections created after the acceptors are paused
* in ClusterConnectionImpl.createNewRecord(), do not failover the record's bridge when the live server is shut down
* removed Acceptor.resume() method (pause() method is only used as step 1 in the 2 steps to stop the remoting service)

Modified: trunk/src/main/org/hornetq/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AcceptorControl.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/AcceptorControl.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -28,8 +28,4 @@
    String getFactoryClassName();
 
    Map<String, Object> getParameters();
-   
-   void pause() throws Exception;
-   
-   void resume() throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -78,20 +78,10 @@
       acceptor.start();
    }
    
-   public void pause()
-   {
-      acceptor.pause();
-   }
-   
    public void stop() throws Exception
    {
       acceptor.stop();
    }
-   
-   public void resume() throws Exception
-   {
-      acceptor.resume();
-   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -156,18 +156,6 @@
       paused = true;
    }
    
-   public synchronized void resume()
-   {
-      if (!paused || !started)
-      {
-         return;
-      }
-      
-      InVMRegistry.instance.registerAcceptor(id, this);
-      
-      paused = false;
-   }
-   
    public void setNotificationService(NotificationService notificationService)
    {
       this.notificationService = notificationService;

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -15,7 +15,6 @@
 
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,8 +37,6 @@
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.Ping;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -212,8 +209,8 @@
          }
       }
    }
-
-   public synchronized void stop() throws Exception
+   
+   public void stop() throws Exception
    {
       if (!started)
       {
@@ -228,26 +225,31 @@
          acceptor.pause();
       }
 
-      for (ConnectionEntry entry : connections.values())
+      synchronized (server)
       {
-         entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
-      }
+         for (ConnectionEntry entry : connections.values())
+         {
+            entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
+         }
 
-      for (Acceptor acceptor : acceptors)
-      {
-         acceptor.stop();
-      }
+         for (Acceptor acceptor : acceptors)
+         {
+            acceptor.stop();
+         }
 
-      acceptors.clear();
+         acceptors.clear();
 
-      connections.clear();
+         connections.clear();
 
-      if (managementService != null)
-      {
-         managementService.unregisterAcceptors();
+         if (managementService != null)
+         {
+            managementService.unregisterAcceptors();
+         }
+
+         started = false;
+
       }
 
-      started = false;
    }
 
    public boolean isStarted()
@@ -297,9 +299,7 @@
 
       RemotingConnection rc = new RemotingConnectionImpl(connection,
                                                          interceptors,
-                                                         server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-                                                                                                                               .getExecutor()
-                                                                                                                      : null);
+                                                         config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor() : null);
 
       Channel channel1 = rc.getChannel(1, -1);
       

Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -27,7 +27,5 @@
 {
    void pause();
    
-   void resume();
-   
    void setNotificationService(NotificationService notificationService);
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -438,7 +438,7 @@
                                      retryInterval,
                                      1d,
                                      -1,
-                                     true,
+                                     false,
                                      useDuplicateDetection,
                                      confirmationWindowSize,
                                      managementService.getManagementAddress(),

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -349,9 +349,13 @@
          {
             session.getChannel().flushConfirmations();
          }
+      }
 
-         remotingService.stop();
-
+      // we stop the remoting service outside a lock
+      remotingService.stop();
+      
+      synchronized (this)
+      {
          // Stop the deployers
          if (configuration.isFileDeploymentEnabled())
          {

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -358,6 +358,8 @@
          httpKeepAliveRunnable.close();
       }
 
+      // serverChannelGroup has been unbound in pause()
+      serverChannelGroup.close().awaitUninterruptibly();
       ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
 
       if (!future.isCompleteSuccess())
@@ -412,7 +414,7 @@
 
    private boolean paused;
 
-   public synchronized void pause()
+   public void pause()
    {
       if (paused)
       {
@@ -425,32 +427,25 @@
       }
 
       // We *pause* the acceptor so no new connections are made
-
-      serverChannelGroup.close().awaitUninterruptibly();
-
-      try
+      ChannelGroupFuture future = serverChannelGroup.unbind().awaitUninterruptibly();
+      if (!future.isCompleteSuccess())
       {
-         Thread.sleep(500);
+         log.warn("server channel group did not completely unbind");
+         Iterator<Channel> iterator = future.getGroup().iterator();
+         while (iterator.hasNext())
+         {
+            Channel channel = (Channel)iterator.next();
+            if (channel.isBound())
+            {
+               log.warn(channel + " is still bound to " + channel.getRemoteAddress());
+            }
+         }
       }
-      catch (Exception e)
-      {
-      }
 
       paused = true;
    }
 
-   public synchronized void resume()
-   {
-      if (!paused)
-      {
-         return;
-      }
-
-      startServerChannels();
-
-      paused = false;
-   }
-
+   
    public void setNotificationService(final NotificationService notificationService)
    {
       this.notificationService = notificationService;

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -63,7 +63,7 @@
  *
  *
  */
-public class ClusterTestBase extends ServiceTestBase
+public abstract class ClusterTestBase extends ServiceTestBase
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
@@ -799,7 +799,7 @@
          ClientMessage message;
          do
          {
-            message = holder.consumer.receive(200);
+            message = holder.consumer.receive(1000);
 
             if (message != null)
             {
@@ -809,7 +809,7 @@
 
                if (prevCount != null)
                {
-                  assertTrue(count == prevCount + consumerIDs.length);
+                  assertEquals(prevCount + consumerIDs.length, count);
                }
 
                assertFalse(counts.contains(count));
@@ -835,7 +835,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         assertTrue(counts.contains(i));
+         assertTrue("did not receive message " + i, counts.contains(i));
       }
    }
 
@@ -1607,5 +1607,4 @@
          }
       }
    }
-
 }

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * 
+ * A ClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public class ClusterWithBackupTest extends ClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(ClusterWithBackupTest.class);
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      setupServers();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers();
+
+      super.tearDown();
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+   
+   public void testBasicRoundRobin() throws Exception
+   {
+      setupCluster();
+
+      startServers(0, 1, 2, 3, 4, 5);
+      
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+      setupSessionFactory(5, isNetty());
+      
+      createQueue(3, "queues.testaddress", "queue0", null, false);
+      createQueue(4, "queues.testaddress", "queue0", null, false);
+      createQueue(5, "queues.testaddress", "queue0", null, false);
+      
+      addConsumer(0, 3, "queue0", null);
+      addConsumer(1, 4, "queue0", null);
+      addConsumer(2, 5, "queue0", null);
+      
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+      waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(3, "queues.testaddress", 2, 2, false);
+      waitForBindings(4, "queues.testaddress", 2, 2, false);
+      waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+      send(3, "queues.testaddress", 100, false, null);
+
+      verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+      verifyNotReceive(0, 0, 1, 2);
+   }
+   
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+      
+      
+      setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+
+      setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+
+      setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+   }
+
+   protected void setupServers() throws Exception
+   {
+      //The backups
+      setupServer(0, isFileStorage(), isNetty(), true);
+      setupServer(1, isFileStorage(), isNetty(), true);
+      setupServer(2, isFileStorage(), isNetty(), true);
+      
+      //The lives
+      setupServer(3, isFileStorage(), isNetty(), 0);
+      setupServer(4, isFileStorage(), isNetty(), 1);
+      setupServer(5, isFileStorage(), isNetty(), 2);      
+      
+   }
+
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(3, 4, 5, 0, 1, 2);
+   }
+
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettyFileStorageSymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageSymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+  
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,828 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 13 Mar 2009 11:00:31
+ *
+ *
+ */
+public class SymmetricClusterWithBackupTest extends SymmetricClusterTest
+{
+   private static final Logger log = Logger.getLogger(SymmetricClusterWithBackupTest.class);
+
+   public void testStopAllStartAll() throws Throwable
+   {
+      try
+      {
+         setupCluster();
+   
+         startServers();
+   
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+         setupSessionFactory(3, isNetty());
+         setupSessionFactory(4, isNetty());
+   
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+         createQueue(3, "queues.testaddress", "queue0", null, false);
+         createQueue(4, "queues.testaddress", "queue0", null, false);
+   
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+         addConsumer(3, 3, "queue0", null);
+         addConsumer(4, 4, "queue0", null);
+   
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
+   
+         waitForBindings(0, "queues.testaddress", 4, 4, false);
+         waitForBindings(1, "queues.testaddress", 4, 4, false);
+         waitForBindings(2, "queues.testaddress", 4, 4, false);
+         waitForBindings(3, "queues.testaddress", 4, 4, false);
+         waitForBindings(4, "queues.testaddress", 4, 4, false);
+   
+         System.out.println("waited for all bindings");
+
+         send(0, "queues.testaddress", 10, false, null);
+   
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+   
+         verifyNotReceive(0, 1, 2, 3, 4);
+   
+         closeAllConsumers();
+         
+         closeAllSessionFactories();
+
+         stopServers();
+   
+         startServers();
+   
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+         setupSessionFactory(3, isNetty());
+         setupSessionFactory(4, isNetty());
+   
+         createQueue(0, "queues.testaddress", "queue0", null, false);
+         createQueue(1, "queues.testaddress", "queue0", null, false);
+         createQueue(2, "queues.testaddress", "queue0", null, false);
+         createQueue(3, "queues.testaddress", "queue0", null, false);
+         createQueue(4, "queues.testaddress", "queue0", null, false);
+   
+         addConsumer(0, 0, "queue0", null);
+         addConsumer(1, 1, "queue0", null);
+         addConsumer(2, 2, "queue0", null);
+         addConsumer(3, 3, "queue0", null);
+         addConsumer(4, 4, "queue0", null);
+   
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(3, "queues.testaddress", 1, 1, true);
+         waitForBindings(4, "queues.testaddress", 1, 1, true);
+   
+         waitForBindings(0, "queues.testaddress", 4, 4, false);
+         waitForBindings(1, "queues.testaddress", 4, 4, false);
+         waitForBindings(2, "queues.testaddress", 4, 4, false);
+         waitForBindings(3, "queues.testaddress", 4, 4, false);
+         waitForBindings(4, "queues.testaddress", 4, 4, false);
+   
+         send(0, "queues.testaddress", 10, false, null);
+   
+         verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+   
+         this.verifyNotReceive(0, 1, 2, 3, 4);
+         
+         
+         closeAllConsumers();
+
+         closeAllSessionFactories();
+      }
+      catch (Throwable e)
+      {
+         System.out.println(threadDump("SymmetricClusterWithBackupTest::testStopAllStartAll"));
+         throw e;
+      }
+   }
+
+   @Override
+   public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforeAllServersAreStarted() throws Exception
+   {
+      setupCluster();
+
+      startServers(5, 0);
+
+      setupSessionFactory(0, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(5, 0, "queue5", null);
+
+      startServers(6, 1);
+      setupSessionFactory(1, isNetty());
+
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(11, 1, "queue11", null);
+      addConsumer(16, 1, "queue15", null);
+
+      startServers(7, 2);
+      setupSessionFactory(2, isNetty());
+
+      createQueue(2, "queues.testaddress", "queue2", null, false);
+      createQueue(2, "queues.testaddress", "queue7", null, false);
+      createQueue(2, "queues.testaddress", "queue12", null, false);
+      createQueue(2, "queues.testaddress", "queue15", null, false);
+      createQueue(2, "queues.testaddress", "queue16", null, false);
+
+      addConsumer(2, 2, "queue2", null);
+      addConsumer(7, 2, "queue7", null);
+      addConsumer(12, 2, "queue12", null);
+      addConsumer(17, 2, "queue15", null);
+
+      startServers(8, 3);
+      setupSessionFactory(3, isNetty());
+
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(3, 3, "queue3", null);
+      addConsumer(8, 3, "queue8", null);
+      addConsumer(13, 3, "queue13", null);
+      addConsumer(18, 3, "queue15", null);
+
+      startServers(9, 4);
+      setupSessionFactory(4, isNetty());
+
+      createQueue(4, "queues.testaddress", "queue4", null, false);
+      createQueue(4, "queues.testaddress", "queue9", null, false);
+      createQueue(4, "queues.testaddress", "queue14", null, false);
+      createQueue(4, "queues.testaddress", "queue15", null, false);
+      createQueue(4, "queues.testaddress", "queue16", null, false);
+      createQueue(4, "queues.testaddress", "queue17", null, false);
+      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(4, 4, "queue4", null);
+      addConsumer(9, 4, "queue9", null);
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(19, 4, "queue15", null);
+
+      addConsumer(20, 2, "queue16", null);
+      addConsumer(21, 3, "queue16", null);
+      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+      addConsumer(25, 4, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+      addConsumer(27, 4, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+      
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+   }
+
+   public void _test() throws Exception
+   {
+      for (int i = 0; i < 50; i++)
+      {
+         System.out.println("\n\n" + i + "\n\n");
+         testStartStopWithTwoServers();
+         tearDown();
+         setUp();
+      }  
+   }
+
+   public void testStartStopWithTwoServers() throws Exception
+   {
+      setupCluster();
+
+      startServers(5, 6, 0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+//      setupSessionFactory(2, isNetty());
+//      setupSessionFactory(3, isNetty());
+//      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+//      createQueue(2, "queues.testaddress", "queue2", null, false);
+//      createQueue(3, "queues.testaddress", "queue3", null, false);
+//      createQueue(4, "queues.testaddress", "queue4", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+//      createQueue(2, "queues.testaddress", "queue7", null, false);
+//      createQueue(3, "queues.testaddress", "queue8", null, false);
+//      createQueue(4, "queues.testaddress", "queue9", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+//      createQueue(2, "queues.testaddress", "queue12", null, false);
+//      createQueue(3, "queues.testaddress", "queue13", null, false);
+//      createQueue(4, "queues.testaddress", "queue14", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+//      createQueue(2, "queues.testaddress", "queue15", null, false);
+//      createQueue(3, "queues.testaddress", "queue15", null, false);
+//      createQueue(4, "queues.testaddress", "queue15", null, false);
+
+//      createQueue(2, "queues.testaddress", "queue16", null, false);
+//      createQueue(3, "queues.testaddress", "queue16", null, false);
+//      createQueue(4, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+//      createQueue(4, "queues.testaddress", "queue17", null, false);
+
+//      createQueue(3, "queues.testaddress", "queue18", null, false);
+//      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue1", null);
+//      addConsumer(2, 2, "queue2", null);
+//      addConsumer(3, 3, "queue3", null);
+//      addConsumer(4, 4, "queue4", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+//      addConsumer(7, 2, "queue7", null);
+//      addConsumer(8, 3, "queue8", null);
+//      addConsumer(9, 4, "queue9", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(11, 1, "queue11", null);
+//      addConsumer(12, 2, "queue12", null);
+//      addConsumer(13, 3, "queue13", null);
+//      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(16, 1, "queue15", null);
+//      addConsumer(17, 2, "queue15", null);
+//      addConsumer(18, 3, "queue15", null);
+//      addConsumer(19, 4, "queue15", null);
+
+//      addConsumer(20, 2, "queue16", null);
+//      addConsumer(21, 3, "queue16", null);
+//      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+//      addConsumer(25, 4, "queue17", null);
+
+//      addConsumer(26, 3, "queue18", null);
+//      addConsumer(27, 4, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+//      waitForBindings(2, "queues.testaddress", 5, 5, true);
+//      waitForBindings(3, "queues.testaddress", 6, 6, true);
+//      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+     waitForBindings(0, "queues.testaddress", 5 , 5, false);
+//      waitForBindings(0, "queues.testaddress", 23, 23, false);
+     waitForBindings(1, "queues.testaddress", 5, 5, false);
+//      waitForBindings(1, "queues.testaddress", 23, 23, false);
+//      waitForBindings(2, "queues.testaddress", 23, 23, false);
+//      waitForBindings(3, "queues.testaddress", 22, 22, false);
+//      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+//      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+    verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+//      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+//      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+//      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+//      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+      removeConsumer(0);
+      removeConsumer(5);
+      removeConsumer(10);
+      removeConsumer(15);
+      removeConsumer(23);
+//      removeConsumer(3);
+//      removeConsumer(8);
+//      removeConsumer(13);
+//      removeConsumer(18);
+//      removeConsumer(21);
+//      removeConsumer(26);
+
+      closeSessionFactory(0);
+//      closeSessionFactory(3);
+
+      stopServers(0, 5);
+//      stopServers(0, 3, 5, 8);
+
+      startServers(5, 0);
+//      startServers(5, 8, 0, 3);
+
+      Thread.sleep(2000);
+
+      setupSessionFactory(0, isNetty());
+//      setupSessionFactory(3, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+//      createQueue(3, "queues.testaddress", "queue3", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+  //    createQueue(3, "queues.testaddress", "queue8", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+//      createQueue(3, "queues.testaddress", "queue13", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+//      createQueue(3, "queues.testaddress", "queue15", null, false);
+
+//      createQueue(3, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+//      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+//      addConsumer(3, 3, "queue3", null);
+
+      addConsumer(5, 0, "queue5", null);
+//      addConsumer(8, 3, "queue8", null);
+
+      addConsumer(10, 0, "queue10", null);
+//      addConsumer(13, 3, "queue13", null);
+
+      addConsumer(15, 0, "queue15", null);
+//      addConsumer(18, 3, "queue15", null);
+
+//      addConsumer(21, 3, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+
+//      addConsumer(26, 3, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+//      waitForBindings(2, "queues.testaddress", 5, 5, true);
+//      waitForBindings(3, "queues.testaddress", 6, 6, true);
+//      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, false);
+      waitForBindings(1, "queues.testaddress", 5, 5, false);
+//      waitForBindings(0, "queues.testaddress", 23, 23, false);
+//      waitForBindings(1, "queues.testaddress", 23, 23, false);
+//      waitForBindings(2, "queues.testaddress", 23, 23, false);
+//      waitForBindings(3, "queues.testaddress", 22, 22, false);
+///      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+      /*
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);      
+      */
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+   }
+   
+   @Override
+   public void testStartStopServers() throws Exception
+   {
+      setupCluster();
+
+      startServers();
+
+      log.info("setup session factories: ");
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue1", null, false);
+      createQueue(2, "queues.testaddress", "queue2", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+      createQueue(4, "queues.testaddress", "queue4", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(1, "queues.testaddress", "queue6", null, false);
+      createQueue(2, "queues.testaddress", "queue7", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+      createQueue(4, "queues.testaddress", "queue9", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(1, "queues.testaddress", "queue11", null, false);
+      createQueue(2, "queues.testaddress", "queue12", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+      createQueue(4, "queues.testaddress", "queue14", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(1, "queues.testaddress", "queue15", null, false);
+      createQueue(2, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+      createQueue(4, "queues.testaddress", "queue15", null, false);
+
+      createQueue(2, "queues.testaddress", "queue16", null, false);
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+      createQueue(4, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+      createQueue(1, "queues.testaddress", "queue17", null, false);
+      createQueue(4, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+      createQueue(4, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue1", null);
+      addConsumer(2, 2, "queue2", null);
+      addConsumer(3, 3, "queue3", null);
+      addConsumer(4, 4, "queue4", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(6, 1, "queue6", null);
+      addConsumer(7, 2, "queue7", null);
+      addConsumer(8, 3, "queue8", null);
+      addConsumer(9, 4, "queue9", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(11, 1, "queue11", null);
+      addConsumer(12, 2, "queue12", null);
+      addConsumer(13, 3, "queue13", null);
+      addConsumer(14, 4, "queue14", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(16, 1, "queue15", null);
+      addConsumer(17, 2, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+      addConsumer(19, 4, "queue15", null);
+
+      addConsumer(20, 2, "queue16", null);
+      addConsumer(21, 3, "queue16", null);
+      addConsumer(22, 4, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+      addConsumer(24, 1, "queue17", null);
+      addConsumer(25, 4, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+      addConsumer(27, 4, "queue18", null);
+
+      log.info("wait for bindings...");
+      
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      log.info("send and receive messages");
+      
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+      removeConsumer(0);
+      removeConsumer(5);
+      removeConsumer(10);
+      removeConsumer(15);
+      removeConsumer(23);
+      removeConsumer(3);
+      removeConsumer(8);
+      removeConsumer(13);
+      removeConsumer(18);
+      removeConsumer(21);
+      removeConsumer(26);
+
+      closeSessionFactory(0);
+      closeSessionFactory(3);
+
+      log.info("stop servers");
+
+      stopServers(0, 3, 5, 8);
+
+      log.info("restart servers");
+
+      startServers(5, 8, 0, 3);
+
+      Thread.sleep(2000);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(3, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue3", null, false);
+
+      createQueue(0, "queues.testaddress", "queue5", null, false);
+      createQueue(3, "queues.testaddress", "queue8", null, false);
+
+      createQueue(0, "queues.testaddress", "queue10", null, false);
+      createQueue(3, "queues.testaddress", "queue13", null, false);
+
+      createQueue(0, "queues.testaddress", "queue15", null, false);
+      createQueue(3, "queues.testaddress", "queue15", null, false);
+
+      createQueue(3, "queues.testaddress", "queue16", null, false);
+
+      createQueue(0, "queues.testaddress", "queue17", null, false);
+
+      createQueue(3, "queues.testaddress", "queue18", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(3, 3, "queue3", null);
+
+      addConsumer(5, 0, "queue5", null);
+      addConsumer(8, 3, "queue8", null);
+
+      addConsumer(10, 0, "queue10", null);
+      addConsumer(13, 3, "queue13", null);
+
+      addConsumer(15, 0, "queue15", null);
+      addConsumer(18, 3, "queue15", null);
+
+      addConsumer(21, 3, "queue16", null);
+
+      addConsumer(23, 0, "queue17", null);
+
+      addConsumer(26, 3, "queue18", null);
+
+      waitForBindings(0, "queues.testaddress", 5, 5, true);
+      waitForBindings(1, "queues.testaddress", 5, 5, true);
+      waitForBindings(2, "queues.testaddress", 5, 5, true);
+      waitForBindings(3, "queues.testaddress", 6, 6, true);
+      waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+      waitForBindings(0, "queues.testaddress", 23, 23, false);
+      waitForBindings(1, "queues.testaddress", 23, 23, false);
+      waitForBindings(2, "queues.testaddress", 23, 23, false);
+      waitForBindings(3, "queues.testaddress", 22, 22, false);
+      waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 26, 27);      
+
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+   }
+
+   @Override
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      // The lives
+      setupClusterConnectionWithBackups("cluster0",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        0,
+                                        new int[] { 1, 2, 3, 4 },
+                                        new int[] { 6, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster1",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        1,
+                                        new int[] { 0, 2, 3, 4 },
+                                        new int[] { 5, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster2",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        2,
+                                        new int[] { 0, 1, 3, 4 },
+                                        new int[] { 5, 6, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster3",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        3,
+                                        new int[] { 0, 1, 2, 4 },
+                                        new int[] { 5, 6, 7, 9 });
+
+      setupClusterConnectionWithBackups("cluster4",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        4,
+                                        new int[] { 0, 1, 2, 3 },
+                                        new int[] { 5, 6, 7, 8 });
+
+      // The backups
+
+      setupClusterConnectionWithBackups("cluster0",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        5,
+                                        new int[] { 1, 2, 3, 4 },
+                                        new int[] { 6, 7, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster1",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        6,
+                                      new int[] { 0, 2, 3, 4 },
+                                      new int[] { 5, 7, 8, 9 });
+      
+      setupClusterConnectionWithBackups("cluster2",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        7,
+                                        new int[] { 0, 1, 3, 4 },
+                                        new int[] { 5, 6, 8, 9 });
+
+      setupClusterConnectionWithBackups("cluster3",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        8,
+                                        new int[] { 0, 1, 2, 4 },
+                                        new int[] { 5, 6, 7, 9 });
+
+      setupClusterConnectionWithBackups("cluster4",
+                                        "queues",
+                                        forwardWhenNoConsumers,
+                                        1,
+                                        isNetty(),
+                                        9,
+                                        new int[] { 0, 1, 2, 3 },
+                                        new int[] { 5, 6, 7, 8 });
+   }
+
+   @Override
+   protected void setupServers() throws Exception
+   {
+      // The backups
+      setupServer(5, isFileStorage(), isNetty(), true);
+      setupServer(6, isFileStorage(), isNetty(), true);
+      setupServer(7, isFileStorage(), isNetty(), true);
+      setupServer(8, isFileStorage(), isNetty(), true);
+      setupServer(9, isFileStorage(), isNetty(), true);
+
+      // The lives
+      setupServer(0, isFileStorage(), isNetty(), 5);
+      setupServer(1, isFileStorage(), isNetty(), 6);
+      setupServer(2, isFileStorage(), isNetty(), 7);
+      setupServer(3, isFileStorage(), isNetty(), 8);
+      setupServer(4, isFileStorage(), isNetty(), 9);
+   }
+
+   @Override
+   protected void startServers() throws Exception
+   {
+      // Need to set backup, since when restarting backup after it has failed over, backup will have been set to false
+
+      getServer(5).getConfiguration().setBackup(true);
+      getServer(6).getConfiguration().setBackup(true);
+      getServer(7).getConfiguration().setBackup(true);
+      getServer(8).getConfiguration().setBackup(true);
+      getServer(9).getConfiguration().setBackup(true);
+
+      startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+   }
+
+   @Override
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+   }
+
+}



More information about the hornetq-commits mailing list