Author: ataylor
Date: 2009-10-14 09:42:12 -0400 (Wed, 14 Oct 2009)
New Revision: 8110
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
new test
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14
12:59:50 UTC (rev 8109)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14
13:42:12 UTC (rev 8110)
@@ -444,7 +444,6 @@
message.putStringProperty(key, val);
message.putIntProperty(COUNT_PROP, i);
- System.out.println("i = " + i);
producer.send(message);
}
@@ -511,7 +510,6 @@
int msgEnd,
int... consumerIDs) throws Exception
{
- boolean outOfOrder = false;
HashMap<SimpleString, Integer> groupIdsReceived = new
HashMap<SimpleString, Integer>();
for (int i = 0; i < consumerIDs.length; i++)
{
@@ -546,6 +544,7 @@
}
SimpleString id = (SimpleString)
message.getProperty(MessageImpl.HDR_GROUP_ID);
+ System.out.println("received " + id + " on consumer "
+ i);
if(groupIdsReceived.get(id) == null)
{
groupIdsReceived.put(id, i);
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14
12:59:50 UTC (rev 8109)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14
13:42:12 UTC (rev 8110)
@@ -21,6 +21,9 @@
import org.hornetq.core.management.Notification;
import org.hornetq.utils.SimpleString;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
@@ -128,7 +131,7 @@
public void onNotification(Notification notification)
{
-
+
}
}, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
@@ -421,7 +424,6 @@
}
-
public void testGroupingRoundRobin() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
@@ -612,8 +614,8 @@
addConsumer(1, 1, "queue0", null);
waitForBindings(1, "queues.testaddress", 1, 1, true);
-
+
verifyReceiveAllInRange(10, 20, 1);
System.out.println("*****************************************************************************");
@@ -629,77 +631,77 @@
}
public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws
Exception
- {
- setupServer(0, isFileStorage(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
- setupServer(2, isFileStorage(), isNetty());
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
- setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
- setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
- setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
- startServers(0, 1, 2);
+ startServers(0, 1, 2);
- try
- {
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+ try
+ {
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
- createQueue(0, "queues.testaddress", "queue0", null,
true);
- createQueue(1, "queues.testaddress", "queue0", null,
true);
- createQueue(2, "queues.testaddress", "queue0", null,
true);
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
- addConsumer(0, 1, "queue0", null);
+ addConsumer(0, 1, "queue0", null);
- waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
- sendInRange(1, "queues.testaddress", 0, 10, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(1, "queues.testaddress", 0, 10, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(true, 0, 10, 0);
+ verifyReceiveAllInRange(true, 0, 10, 0);
- closeAllConsumers();
+ closeAllConsumers();
- sendInRange(2, "queues.testaddress", 10, 20, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(2, "queues.testaddress", 10, 20, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- sendInRange(0, "queues.testaddress", 20, 30, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(0, "queues.testaddress", 20, 30, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- stopServers(1);
+ stopServers(1);
- startServers(1);
+ startServers(1);
- addConsumer(1, 1, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
- verifyReceiveAllInRange(10, 30, 1);
+ verifyReceiveAllInRange(10, 30, 1);
-
System.out.println("*****************************************************************************");
- }
- finally
- {
- //closeAllConsumers();
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
- closeAllSessionFactories();
+ closeAllSessionFactories();
- stopServers(0, 1, 2);
- }
+ stopServers(0, 1, 2);
}
+ }
public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws
Exception
@@ -747,7 +749,6 @@
stopServers(1);
-
startServers(1);
sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
@@ -822,8 +823,6 @@
sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
-
startServers(0);
waitForBindings(0, "queues.testaddress", 1, 0, true);
@@ -845,70 +844,137 @@
}
public void testGroupingMultipleQueuesOnAddress() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
{
- setupServer(0, isFileStorage(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
- setupServer(2, isFileStorage(), isNetty());
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
- setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
- setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
- setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+ createQueue(0, "queues.testaddress", "queue1", null,
false);
+ createQueue(1, "queues.testaddress", "queue1", null,
false);
+ createQueue(2, "queues.testaddress", "queue1", null,
false);
- startServers(0, 1, 2);
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
- try
- {
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+ addConsumer(3, 0, "queue0", null);
+ addConsumer(4, 1, "queue0", null);
+ addConsumer(5, 2, "queue0", null);
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
+ waitForBindings(0, "queues.testaddress", 2, 2, true);
+ waitForBindings(1, "queues.testaddress", 2, 2, true);
+ waitForBindings(2, "queues.testaddress", 2, 2, true);
- createQueue(0, "queues.testaddress", "queue0", null,
false);
- createQueue(1, "queues.testaddress", "queue0", null,
false);
- createQueue(2, "queues.testaddress", "queue0", null,
false);
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
- createQueue(0, "queues.testaddress", "queue1", null,
false);
- createQueue(1, "queues.testaddress", "queue1", null,
false);
- createQueue(2, "queues.testaddress", "queue1", null,
false);
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- addConsumer(3, 0, "queue0", null);
- addConsumer(4, 1, "queue0", null);
- addConsumer(5, 2, "queue0", null);
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- waitForBindings(0, "queues.testaddress", 2, 2, true);
- waitForBindings(1, "queues.testaddress", 2, 2, true);
- waitForBindings(2, "queues.testaddress", 2, 2, true);
+ verifyReceiveAll(10, 0);
- waitForBindings(0, "queues.testaddress", 4, 4, false);
- waitForBindings(1, "queues.testaddress", 4, 4, false);
- waitForBindings(2, "queues.testaddress", 4, 4, false);
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+ closeAllSessionFactories();
- sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ stopServers(0, 1, 2);
+ }
+ }
- verifyReceiveAll(10, 0);
+ public void testGroupingMultipleSending() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
-
System.out.println("*****************************************************************************");
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread[] threads = new Thread[9];
+ int range = 0;
+ for(int i = 0 ; i < 9; i++,range+=10)
+ {
+ threads[i] = new Thread(new ThreadSender(range, range+10, 1, new
SimpleString("id" + i), latch, i < 8));
}
- finally
+ for (Thread thread : threads)
{
- closeAllConsumers();
+ thread.start();
+ }
- closeAllSessionFactories();
+ verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1, 2);
- stopServers(0, 1, 2);
- }
+
System.out.println("*****************************************************************************");
}
+ finally
+ {
+ closeAllConsumers();
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
public boolean isNetty()
{
return true;
@@ -918,5 +984,52 @@
{
return true;
}
+
+ class ThreadSender implements Runnable
+ {
+ private int msgStart;
+ private int msgEnd;
+ private SimpleString id;
+ private CountDownLatch latch;
+ private boolean wait;
+ private int node;
+
+ public ThreadSender(int msgStart, int msgEnd, int node, SimpleString id,
CountDownLatch latch, boolean wait)
+ {
+ this.msgStart = msgStart;
+ this.msgEnd = msgEnd;
+ this.node = node;
+ this.id = id;
+ this.latch = latch;
+ this.wait = wait;
+ }
+
+ public void run()
+ {
+ if (wait)
+ {
+ try
+ {
+ latch.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File |
Settings | File Templates.
+ }
+ }
+ else
+ {
+ latch.countDown();
+ }
+ try
+ {
+ sendInRange(node, "queues.testaddress", msgStart, msgEnd, false,
MessageImpl.HDR_GROUP_ID, id);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
+ }
+ }
}