Author: clebert.suconic(a)jboss.com
Date: 2011-03-18 00:54:45 -0400 (Fri, 18 Mar 2011)
New Revision: 10343
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6130 Adding failing test
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-18
00:11:19 UTC (rev 10342)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-18
04:54:45 UTC (rev 10343)
@@ -175,7 +175,7 @@
private static final int MAX_SERVERS = 10;
- private ConsumerHolder[] consumers;
+ protected ConsumerHolder[] consumers;
protected HornetQServer[] servers;
@@ -1113,6 +1113,11 @@
protected int[] getReceivedOrder(final int consumerID) throws Exception
{
+ return getReceivedOrder(consumerID, false);
+ }
+
+ protected int[] getReceivedOrder(final int consumerID, final boolean ack) throws
Exception
+ {
ConsumerHolder consumer = consumers[consumerID];
if (consumer == null)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-18
00:11:19 UTC (rev 10342)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-18
04:54:45 UTC (rev 10343)
@@ -14,6 +14,9 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -104,7 +107,96 @@
MessageRedistributionTest.log.info("Test done");
}
+ //
https://issues.jboss.org/browse/JBPAPP-6130
+ //
https://issues.jboss.org/browse/HORNETQ-654
+ public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
+ {
+ setupCluster(false);
+ MessageRedistributionTest.log.info("Doing test");
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 9, true, null);
+
+ getReceivedOrder(0, true);
+ int[] ids1 = getReceivedOrder(1, false);
+ getReceivedOrder(2, true);
+
+ for (ClusterConnection conn :
servers[1].getClusterManager().getClusterConnections())
+ {
+ ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
+ for (MessageFlowRecord record : impl.getRecords().values())
+ {
+ if (record.getBridge() != null)
+ {
+ System.out.println("stop record bridge");
+ record.getBridge().stop();
+ }
+ }
+ }
+
+ removeConsumer(1);
+
+ for (int i = 0; i <= 2; i++)
+ {
+ servers[i].stop();
+ servers[i] = null;
+ }
+
+ setupServers();
+
+ setupCluster(false);
+
+ startServers(0, 1, 2);
+
+ for (int i = 0 ; i <= 2; i++)
+ {
+ consumers[i] = null;
+ sfs[i] = null;
+ }
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+
+ MessageRedistributionTest.log.info("Test done");
+ }
+
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);
@@ -494,17 +586,16 @@
removeConsumer(1);
addConsumer(0, 0, QUEUE, null);
-
+
waitForMessages(1, ADDRESS, 0);
waitForMessages(0, ADDRESS, 20);
-
+
removeConsumer(0);
addConsumer(1, 1, QUEUE, null);
-
+
waitForMessages(1, ADDRESS, 20);
waitForMessages(0, ADDRESS, 0);
-
-
+
verifyReceiveAll(20, 1);
stop();
@@ -663,7 +754,7 @@
verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
}
-
+
/*
* Start one node with no consumers and send some messages
* Start another node add a consumer and verify all messages are redistribute
@@ -676,26 +767,26 @@
startServers(0);
setupSessionFactory(0, isNetty());
-
+
createQueue(0, "queues.testaddress", "queue0", null, false);
-
+
waitForBindings(0, "queues.testaddress", 1, 0, true);
-
+
send(0, "queues.testaddress", 20, false, null);
-
- //Now bring up node 1
-
+
+ // Now bring up node 1
+
startServers(1);
setupSessionFactory(1, isNetty());
createQueue(1, "queues.testaddress", "queue0", null, false);
-
+
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
-
+
addConsumer(0, 1, "queue0", null);
-
+
verifyReceiveAll(20, 0);
verifyNotReceive(0);
}