Author: jmesnil
Date: 2009-11-26 08:19:27 -0500 (Thu, 26 Nov 2009)
New Revision: 8412
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Modified:
trunk/src/main/org/hornetq/core/management/AcceptorControl.java
trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
re-added cluster with backup tests
* modified RemotingService.stop() sequence to ensure there are no more connections created
after the acceptors are paused
* in ClusterConnectionImpl.createNewRecord(), do not failover the record's bridge when
the live server is shut down
* removed Acceptor.resume() method (pause() method is only used as step 1 in the 2 steps
to stop the remoting service)
Modified: trunk/src/main/org/hornetq/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-11-26 12:46:00
UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-11-26 13:19:27
UTC (rev 8412)
@@ -28,8 +28,4 @@
String getFactoryClassName();
Map<String, Object> getParameters();
-
- void pause() throws Exception;
-
- void resume() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-11-26
12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -78,20 +78,10 @@
acceptor.start();
}
- public void pause()
- {
- acceptor.pause();
- }
-
public void stop() throws Exception
{
acceptor.stop();
}
-
- public void resume() throws Exception
- {
- acceptor.resume();
- }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-11-26
12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -156,18 +156,6 @@
paused = true;
}
- public synchronized void resume()
- {
- if (!paused || !started)
- {
- return;
- }
-
- InVMRegistry.instance.registerAcceptor(id, this);
-
- paused = false;
- }
-
public void setNotificationService(NotificationService notificationService)
{
this.notificationService = notificationService;
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-26
12:46:00 UTC (rev 8411)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -15,7 +15,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -38,8 +37,6 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
import org.hornetq.core.remoting.server.RemotingService;
@@ -212,8 +209,8 @@
}
}
}
-
- public synchronized void stop() throws Exception
+
+ public void stop() throws Exception
{
if (!started)
{
@@ -228,26 +225,31 @@
acceptor.pause();
}
- for (ConnectionEntry entry : connections.values())
+ synchronized (server)
{
- entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
- }
+ for (ConnectionEntry entry : connections.values())
+ {
+ entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
+ }
- for (Acceptor acceptor : acceptors)
- {
- acceptor.stop();
- }
+ for (Acceptor acceptor : acceptors)
+ {
+ acceptor.stop();
+ }
- acceptors.clear();
+ acceptors.clear();
- connections.clear();
+ connections.clear();
- if (managementService != null)
- {
- managementService.unregisterAcceptors();
+ if (managementService != null)
+ {
+ managementService.unregisterAcceptors();
+ }
+
+ started = false;
+
}
- started = false;
}
public boolean isStarted()
@@ -297,9 +299,7 @@
RemotingConnection rc = new RemotingConnectionImpl(connection,
interceptors,
-
server.getConfiguration().isAsyncConnectionExecutionEnabled() ?
server.getExecutorFactory()
-
.getExecutor()
-
: null);
+
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor() :
null);
Channel channel1 = rc.getChannel(1, -1);
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-11-26 12:46:00 UTC
(rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-11-26 13:19:27 UTC
(rev 8412)
@@ -27,7 +27,5 @@
{
void pause();
- void resume();
-
void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-26
12:46:00 UTC (rev 8411)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -438,7 +438,7 @@
retryInterval,
1d,
-1,
- true,
+ false,
useDuplicateDetection,
confirmationWindowSize,
managementService.getManagementAddress(),
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-26 12:46:00
UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-26 13:19:27
UTC (rev 8412)
@@ -349,9 +349,13 @@
{
session.getChannel().flushConfirmations();
}
+ }
- remotingService.stop();
-
+ // we stop the remoting service outside a lock
+ remotingService.stop();
+
+ synchronized (this)
+ {
// Stop the deployers
if (configuration.isFileDeploymentEnabled())
{
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-26
12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -358,6 +358,8 @@
httpKeepAliveRunnable.close();
}
+ // serverChannelGroup has been unbound in pause()
+ serverChannelGroup.close().awaitUninterruptibly();
ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
if (!future.isCompleteSuccess())
@@ -412,7 +414,7 @@
private boolean paused;
- public synchronized void pause()
+ public void pause()
{
if (paused)
{
@@ -425,32 +427,25 @@
}
// We *pause* the acceptor so no new connections are made
-
- serverChannelGroup.close().awaitUninterruptibly();
-
- try
+ ChannelGroupFuture future = serverChannelGroup.unbind().awaitUninterruptibly();
+ if (!future.isCompleteSuccess())
{
- Thread.sleep(500);
+ log.warn("server channel group did not completely unbind");
+ Iterator<Channel> iterator = future.getGroup().iterator();
+ while (iterator.hasNext())
+ {
+ Channel channel = (Channel)iterator.next();
+ if (channel.isBound())
+ {
+ log.warn(channel + " is still bound to " +
channel.getRemoteAddress());
+ }
+ }
}
- catch (Exception e)
- {
- }
paused = true;
}
- public synchronized void resume()
- {
- if (!paused)
- {
- return;
- }
-
- startServerChannels();
-
- paused = false;
- }
-
+
public void setNotificationService(final NotificationService notificationService)
{
this.notificationService = notificationService;
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-26
12:46:00 UTC (rev 8411)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -63,7 +63,7 @@
*
*
*/
-public class ClusterTestBase extends ServiceTestBase
+public abstract class ClusterTestBase extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
@@ -799,7 +799,7 @@
ClientMessage message;
do
{
- message = holder.consumer.receive(200);
+ message = holder.consumer.receive(1000);
if (message != null)
{
@@ -809,7 +809,7 @@
if (prevCount != null)
{
- assertTrue(count == prevCount + consumerIDs.length);
+ assertEquals(prevCount + consumerIDs.length, count);
}
assertFalse(counts.contains(count));
@@ -835,7 +835,7 @@
for (int i = 0; i < numMessages; i++)
{
- assertTrue(counts.contains(i));
+ assertTrue("did not receive message " + i, counts.contains(i));
}
}
@@ -1607,5 +1607,4 @@
}
}
}
-
}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ *
+ * A ClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public class ClusterWithBackupTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterWithBackupTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers();
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4, 5);
+
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+ setupSessionFactory(5, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+ createQueue(5, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 3, "queue0", null);
+ addConsumer(1, 4, "queue0", null);
+ addConsumer(2, 5, "queue0", null);
+
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+ waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+ send(3, "queues.testaddress", 100, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+ verifyNotReceive(0, 0, 1, 2);
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+
+ setupClusterConnection("cluster1", "queues",
forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+
+ setupClusterConnection("cluster2", "queues",
forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+
+
+ setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+
+ setupClusterConnection("cluster1", "queues",
forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+
+ setupClusterConnection("cluster2", "queues",
forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ //The backups
+ setupServer(0, isFileStorage(), isNetty(), true);
+ setupServer(1, isFileStorage(), isNetty(), true);
+ setupServer(2, isFileStorage(), isNetty(), true);
+
+ //The lives
+ setupServer(3, isFileStorage(), isNetty(), 0);
+ setupServer(4, isFileStorage(), isNetty(), 1);
+ setupServer(5, isFileStorage(), isNetty(), 2);
+
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(3, 4, 5, 0, 1, 2);
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettyFileStorageSymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageSymmetricClusterWithBackupTest extends
SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-11-26
13:19:27 UTC (rev 8412)
@@ -0,0 +1,828 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 13 Mar 2009 11:00:31
+ *
+ *
+ */
+public class SymmetricClusterWithBackupTest extends SymmetricClusterTest
+{
+ private static final Logger log =
Logger.getLogger(SymmetricClusterWithBackupTest.class);
+
+ public void testStopAllStartAll() throws Throwable
+ {
+ try
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+ createQueue(3, "queues.testaddress", "queue0", null,
false);
+ createQueue(4, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ System.out.println("waited for all bindings");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+ createQueue(3, "queues.testaddress", "queue0", null,
false);
+ createQueue(4, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ this.verifyNotReceive(0, 1, 2, 3, 4);
+
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+ catch (Throwable e)
+ {
+
System.out.println(threadDump("SymmetricClusterWithBackupTest::testStopAllStartAll"));
+ throw e;
+ }
+ }
+
+ @Override
+ public void
testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforeAllServersAreStarted()
throws Exception
+ {
+ setupCluster();
+
+ startServers(5, 0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(5, 0, "queue5", null);
+
+ startServers(6, 1);
+ setupSessionFactory(1, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(16, 1, "queue15", null);
+
+ startServers(7, 2);
+ setupSessionFactory(2, isNetty());
+
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(17, 2, "queue15", null);
+
+ startServers(8, 3);
+ setupSessionFactory(3, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(18, 3, "queue15", null);
+
+ startServers(9, 4);
+ setupSessionFactory(4, isNetty());
+
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(4, 4, "queue4", null);
+ addConsumer(9, 4, "queue9", null);
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopWithTwoServers();
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testStartStopWithTwoServers() throws Exception
+ {
+ setupCluster();
+
+ startServers(5, 6, 0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+// setupSessionFactory(2, isNetty());
+// setupSessionFactory(3, isNetty());
+// setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+// createQueue(2, "queues.testaddress", "queue2", null, false);
+// createQueue(3, "queues.testaddress", "queue3", null, false);
+// createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+// createQueue(2, "queues.testaddress", "queue7", null, false);
+// createQueue(3, "queues.testaddress", "queue8", null, false);
+// createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+// createQueue(2, "queues.testaddress", "queue12", null,
false);
+// createQueue(3, "queues.testaddress", "queue13", null,
false);
+// createQueue(4, "queues.testaddress", "queue14", null,
false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+// createQueue(2, "queues.testaddress", "queue15", null,
false);
+// createQueue(3, "queues.testaddress", "queue15", null,
false);
+// createQueue(4, "queues.testaddress", "queue15", null,
false);
+
+// createQueue(2, "queues.testaddress", "queue16", null,
false);
+// createQueue(3, "queues.testaddress", "queue16", null,
false);
+// createQueue(4, "queues.testaddress", "queue16", null,
false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+// createQueue(4, "queues.testaddress", "queue17", null,
false);
+
+// createQueue(3, "queues.testaddress", "queue18", null,
false);
+// createQueue(4, "queues.testaddress", "queue18", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+// addConsumer(2, 2, "queue2", null);
+// addConsumer(3, 3, "queue3", null);
+// addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+// addConsumer(7, 2, "queue7", null);
+// addConsumer(8, 3, "queue8", null);
+// addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+// addConsumer(12, 2, "queue12", null);
+// addConsumer(13, 3, "queue13", null);
+// addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+// addConsumer(17, 2, "queue15", null);
+// addConsumer(18, 3, "queue15", null);
+// addConsumer(19, 4, "queue15", null);
+
+// addConsumer(20, 2, "queue16", null);
+// addConsumer(21, 3, "queue16", null);
+// addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+// addConsumer(25, 4, "queue17", null);
+
+// addConsumer(26, 3, "queue18", null);
+// addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+// waitForBindings(2, "queues.testaddress", 5, 5, true);
+// waitForBindings(3, "queues.testaddress", 6, 6, true);
+// waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 5 , 5, false);
+// waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 5, 5, false);
+// waitForBindings(1, "queues.testaddress", 23, 23, false);
+// waitForBindings(2, "queues.testaddress", 23, 23, false);
+// waitForBindings(3, "queues.testaddress", 22, 22, false);
+// waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+// verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+// verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+// removeConsumer(3);
+// removeConsumer(8);
+// removeConsumer(13);
+// removeConsumer(18);
+// removeConsumer(21);
+// removeConsumer(26);
+
+ closeSessionFactory(0);
+// closeSessionFactory(3);
+
+ stopServers(0, 5);
+// stopServers(0, 3, 5, 8);
+
+ startServers(5, 0);
+// startServers(5, 8, 0, 3);
+
+ Thread.sleep(2000);
+
+ setupSessionFactory(0, isNetty());
+// setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+// createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ // createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+// createQueue(3, "queues.testaddress", "queue13", null,
false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+// createQueue(3, "queues.testaddress", "queue15", null,
false);
+
+// createQueue(3, "queues.testaddress", "queue16", null,
false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+// createQueue(3, "queues.testaddress", "queue18", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+// addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+// addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+// addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+// addConsumer(18, 3, "queue15", null);
+
+// addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+// addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+// waitForBindings(2, "queues.testaddress", 5, 5, true);
+// waitForBindings(3, "queues.testaddress", 6, 6, true);
+// waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, false);
+ waitForBindings(1, "queues.testaddress", 5, 5, false);
+// waitForBindings(0, "queues.testaddress", 23, 23, false);
+// waitForBindings(1, "queues.testaddress", 23, 23, false);
+// waitForBindings(2, "queues.testaddress", 23, 23, false);
+// waitForBindings(3, "queues.testaddress", 22, 22, false);
+/// waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+ /*
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+ */
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ public void testStartStopServers() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ log.info("setup session factories: ");
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ log.info("wait for bindings...");
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ log.info("send and receive messages");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(0);
+ closeSessionFactory(3);
+
+ log.info("stop servers");
+
+ stopServers(0, 3, 5, 8);
+
+ log.info("restart servers");
+
+ startServers(5, 8, 0, 3);
+
+ Thread.sleep(2000);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+
+ addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ // The lives
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 0,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 1,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 2,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 3,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 4,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+
+ // The backups
+
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 5,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 6,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 7,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 8,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 9,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ // The backups
+ setupServer(5, isFileStorage(), isNetty(), true);
+ setupServer(6, isFileStorage(), isNetty(), true);
+ setupServer(7, isFileStorage(), isNetty(), true);
+ setupServer(8, isFileStorage(), isNetty(), true);
+ setupServer(9, isFileStorage(), isNetty(), true);
+
+ // The lives
+ setupServer(0, isFileStorage(), isNetty(), 5);
+ setupServer(1, isFileStorage(), isNetty(), 6);
+ setupServer(2, isFileStorage(), isNetty(), 7);
+ setupServer(3, isFileStorage(), isNetty(), 8);
+ setupServer(4, isFileStorage(), isNetty(), 9);
+ }
+
+ @Override
+ protected void startServers() throws Exception
+ {
+ // Need to set backup, since when restarting backup after it has failed over,
backup will have been set to false
+
+ getServer(5).getConfiguration().setBackup(true);
+ getServer(6).getConfiguration().setBackup(true);
+ getServer(7).getConfiguration().setBackup(true);
+ getServer(8).getConfiguration().setBackup(true);
+ getServer(9).getConfiguration().setBackup(true);
+
+ startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+ }
+
+ @Override
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+
+}