[hornetq-commits] JBoss hornetq SVN: r8102 - in branches/hornetq_grouping: examples/javaee and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 14 05:13:40 EDT 2009
Author: ataylor
Date: 2009-10-14 05:13:40 -0400 (Wed, 14 Oct 2009)
New Revision: 8102
Modified:
branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
branches/hornetq_grouping/hornetq.iml
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
fixes
Modified: branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml 2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml 2009-10-14 09:13:40 UTC (rev 8102)
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
- <output url="file://$MODULE_DIR$/output/classes" />
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/ejb-jms-transaction/src" isTestSource="false" />
Modified: branches/hornetq_grouping/hornetq.iml
===================================================================
--- branches/hornetq_grouping/hornetq.iml 2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/hornetq.iml 2009-10-14 09:13:40 UTC (rev 8102)
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="true">
- <output url="file://$MODULE_DIR$/output/classes" />
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/build/src" isTestSource="false" />
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-14 09:13:40 UTC (rev 8102)
@@ -166,7 +166,7 @@
{
bindable.route(message, tx);
}
-
+
return true;
}
@@ -284,7 +284,7 @@
{
routed = routeFromCluster(message, tx);
}
- else if(groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+ else if (groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
{
routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
}
@@ -305,127 +305,133 @@
continue;
}
- Integer ipos = routingNamePositions.get(routingName);
+ Binding theBinding = getNextBinding(message, routingName, bindings);
- int pos = ipos != null ? ipos.intValue() : 0;
- int length = bindings.size();
-
- int startPos = pos;
-
- Binding theBinding = null;
-
- int lastLowPriorityBinding = -1;
-
- while (true)
+ if (theBinding != null)
{
- Binding binding;
- try
- {
- binding = bindings.get(pos);
- }
- catch (IndexOutOfBoundsException e)
- {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
- {
- pos = 0;
- startPos = 0;
- length = bindings.size();
+ theBinding.willRoute(message);
- continue;
- }
- else
- {
- break;
- }
- }
+ chosen.add(theBinding.getBindable());
+ }
+ }
- Filter filter = binding.getFilter();
+ // TODO refactor to do this is one iteration
- if (filter == null || filter.match(message))
+ for (Bindable bindable : chosen)
{
- // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
- // unnecessary overhead)
- if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
- {
- theBinding = binding;
+ bindable.preroute(message, tx);
+ }
- pos = incrementPos(pos, length);
+ for (Bindable bindable : chosen)
+ {
+ bindable.route(message, tx);
- break;
- }
- else
- {
- if (lastLowPriorityBinding == -1)
- {
- lastLowPriorityBinding = pos;
- }
- }
+ routed = true;
}
+ }
+ }
- pos = incrementPos(pos, length);
+ return routed;
+ }
- if (pos == startPos)
- {
- if (lastLowPriorityBinding != -1)
- {
- try
- {
- theBinding = bindings.get(pos);
- }
- catch (IndexOutOfBoundsException e)
- {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
- {
- pos = 0;
+ private Binding getNextBinding(ServerMessage message, SimpleString routingName, List<Binding> bindings)
+ {
+ Integer ipos = routingNamePositions.get(routingName);
- lastLowPriorityBinding = -1;
+ int pos = ipos != null ? ipos : 0;
- continue;
- }
- else
- {
- break;
- }
- }
+ int length = bindings.size();
- pos = lastLowPriorityBinding;
+ int startPos = pos;
- pos = incrementPos(pos, length);
- }
+ Binding theBinding = null;
+
+ int lastLowPriorityBinding = -1;
+
+ while (true)
+ {
+ Binding binding;
+ try
+ {
+ binding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+ startPos = 0;
+ length = bindings.size();
+
+ continue;
+ }
+ else
+ {
break;
}
}
- if (theBinding != null)
+ Filter filter = binding.getFilter();
+
+ if (filter == null || filter.match(message))
{
- theBinding.willRoute(message);
+ // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+ // unnecessary overhead)
+ if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
+ {
+ theBinding = binding;
- chosen.add(theBinding.getBindable());
- }
+ pos = incrementPos(pos, length);
- routingNamePositions.put(routingName, pos);
+ break;
+ }
+ else
+ {
+ if (lastLowPriorityBinding == -1)
+ {
+ lastLowPriorityBinding = pos;
+ }
+ }
}
- // TODO refactor to do this is one iteration
+ pos = incrementPos(pos, length);
- for (Bindable bindable : chosen)
+ if (pos == startPos)
+ {
+ if (lastLowPriorityBinding != -1)
{
- bindable.preroute(message, tx);
- }
+ try
+ {
+ theBinding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
-
- routed = true;
+ lastLowPriorityBinding = -1;
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ pos = lastLowPriorityBinding;
+
+ pos = incrementPos(pos, length);
}
+ break;
}
}
-
- return routed;
+ routingNamePositions.put(routingName, pos);
+ return theBinding;
}
private void routeUsingStrictOrdering(ServerMessage message, Transaction tx, GroupingHandler groupingGroupingHandler)
@@ -433,36 +439,27 @@
{
SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
Response resp = groupingGroupingHandler.propose(new Proposal(groupId, null));
- if(resp == null)
+ if (resp == null)
{
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
List<Binding> bindings = entry.getValue();
- Binding chosen = null;
- Binding lowestPriorityBinding = null;
- int lowestPriority = Integer.MAX_VALUE;
- for (Binding binding : bindings)
+
+ if (bindings == null)
{
- boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
- int distance = binding.getDistance();
- if((distance < lowestPriority))
- {
- lowestPriorityBinding = binding;
- lowestPriority = distance;
- if(bindingIsHighAcceptPriority)
- {
- chosen = binding;
- }
- }
+ // The value can become null if it's concurrently removed while we're iterating - this is expected
+ // ConcurrentHashMap behaviour!
+ continue;
}
- if(chosen == null)
- {
- chosen = lowestPriorityBinding;
- }
+
+
+ Binding chosen = getNextBinding(message, routingName, bindings);
+
resp = groupingGroupingHandler.propose(new Proposal(groupId, chosen.getClusterName()));
- if(!resp.getChosen().equals(chosen.getClusterName()))
+
+ if (!resp.getChosen().equals(chosen.getClusterName()))
{
for (Binding binding : bindings)
{
@@ -474,7 +471,7 @@
}
}
- if( chosen != null )
+ if (chosen != null)
{
chosen.willRoute(message);
chosen.getBindable().preroute(message, tx);
@@ -492,13 +489,13 @@
Binding chosen = null;
for (Binding binding : bindings)
{
- if(binding.getClusterName().equals(resp.getChosen()))
+ if (binding.getClusterName().equals(resp.getChosen()))
{
chosen = binding;
break;
}
}
- if( chosen != null)
+ if (chosen != null)
{
chosen.willRoute(message);
chosen.getBindable().preroute(message, tx);
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-14 09:13:40 UTC (rev 8102)
@@ -51,6 +51,7 @@
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
@@ -481,6 +482,117 @@
verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
}
+ protected void verifyReceiveAllWithGroupIDRoundRobin(
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
+ }
+
+ protected int verifyReceiveAllOnSingleConsumer(int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
+ }
+
+ protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
+ long firstReceiveTime,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ boolean outOfOrder = false;
+ HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ConsumerHolder holder = consumers[consumerIDs[i]];
+
+ if (holder == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
+
+ for (int j = msgStart; j < msgEnd; j++)
+ {
+ ClientMessage message = holder.consumer.receive(2000);
+
+ if (message == null)
+ {
+ log.info("*** dumping consumers:");
+
+ dumpConsumers();
+
+ assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+ }
+
+ if (ack)
+ {
+ message.acknowledge();
+ }
+
+ if (firstReceiveTime != -1)
+ {
+ assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
+ }
+
+ SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+ if(groupIdsReceived.get(id) == null)
+ {
+ groupIdsReceived.put(id, i);
+ }
+ else if (groupIdsReceived.get(id) != i)
+ {
+ fail("consumer " + groupIdsReceived.get(id) + " already bound to groupid " + id + " received on consumer " + i);
+ }
+
+ }
+
+ }
+
+
+ }
+
+ protected int verifyReceiveAllOnSingleConsumer(boolean ack,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
+ {
+ int groupIdsReceived = -1;
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ConsumerHolder holder = consumers[consumerIDs[i]];
+
+ if (holder == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
+ ClientMessage message = holder.consumer.receive(2000);
+ if (message != null)
+ {
+ groupIdsReceived = i;
+ for (int j = msgStart + 1; j < msgEnd; j++)
+ {
+ message = holder.consumer.receive(2000);
+
+ if (message == null)
+ {
+ fail("consumer " + i + " did not receive all messages");
+ }
+
+ if (ack)
+ {
+ message.acknowledge();
+ }
+ }
+ }
+
+ }
+ return groupIdsReceived;
+
+ }
+
protected void verifyReceiveAllInRangeNotBefore(boolean ack,
long firstReceiveTime,
int msgStart,
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 08:59:08 UTC (rev 8101)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-14 09:13:40 UTC (rev 8102)
@@ -228,17 +228,15 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
- waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
@@ -324,7 +322,9 @@
}
}
- public void testGroupingSendTo3queuesNoConsumersDeliveredToLocalQueue() throws Exception
+
+
+ public void testGroupingRoundRobin() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
@@ -352,26 +352,21 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
- waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
- waitForBindings(0, "queues.testaddress", 2, 0, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 0, false);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
- sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(0, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(0, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id2"));
+ sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id3"));
+ verifyReceiveAllWithGroupIDRoundRobin(0, 10, 0, 1, 2);
- sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- verifyReceiveAllInRange(0, 30, 1 );
-
System.out.println("*****************************************************************************");
}
finally
@@ -384,6 +379,7 @@
}
}
+
public void testGroupingSendTo3queuesQueueRemoved() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
@@ -498,49 +494,257 @@
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue0", null, false);
- createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+
+ stopServers(1);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+ sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ startServers(1);
+
addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
- waitForBindings(0, "queues.testaddress", 1, 1, true);
+ verifyReceiveAllInRange(10, 20, 1);
+ verifyReceiveAllInRange(20, 30, 1);
+
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(true, 0, 10, 0);
+
+ closeAllConsumers();
+
+ sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+ sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ stopServers(1);
+
+ startServers(1);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+
+ verifyReceiveAllInRange(10, 30, 1);
+
+
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+
+ public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(0, 10, 1);
+ verifyReceiveAllInRange(0, 10, 0);
- stopClusterConnections(1);
-
stopServers(1);
+
+
+ startServers(1);
+
sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- startServers(1);
- setupSessionFactory(1, isNetty());
- createQueue(1, "queues.testaddress", "queue0", null, false);
+
addConsumer(1, 1, "queue0", null);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
+
+
verifyReceiveAllInRange(10, 20, 1);
verifyReceiveAllInRange(20, 30, 1);
System.out.println("*****************************************************************************");
}
- catch(Exception e)
+ finally
{
- e.printStackTrace();
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
}
+ }
+
+ public void testGroupingSendTo3queuesSendingNodeGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+
+ stopServers(0);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+
+
+
+ startServers(0);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ verifyReceiveAllInRange(20, 30, 0);
+
+ System.out.println("*****************************************************************************");
+ }
finally
{
//closeAllConsumers();
@@ -551,6 +755,71 @@
}
}
+ public void testGroupingMultipleQueuesOnAddress() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue1", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ addConsumer(3, 0, "queue0", null);
+ addConsumer(4, 1, "queue0", null);
+ addConsumer(5, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, true);
+ waitForBindings(1, "queues.testaddress", 2, 2, true);
+ waitForBindings(2, "queues.testaddress", 2, 2, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+
+ sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
public boolean isNetty()
{
return true;
@@ -558,7 +827,7 @@
public boolean isFileStorage()
{
- return false;
+ return true;
}
}
More information about the hornetq-commits
mailing list