[jboss-cvs] JBoss Messaging SVN: r2829 - in trunk: src/main/org/jboss/jms/client/state and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 3 13:08:00 EDT 2007
Author: timfox
Date: 2007-07-03 13:08:00 -0400 (Tue, 03 Jul 2007)
New Revision: 2829
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
Log:
Completed clustered post office test
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -47,6 +47,7 @@
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
+ * @author <a href="mailto:sergey.koshcheyev at jboss.com">Sergey Koscheyev/a>
* @version <tt>$Revision: 2774 $</tt>
*
* $Id: MessageCallbackHandler.java 2774 2007-06-12 22:43:54Z timfox $
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -344,7 +344,6 @@
catch (Exception e)
{
log.error(e.toString(),e);
- log.info("RecoverDeliveries failed, marking session as invalidated!");
this.getDelegate().invalidate();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -911,8 +911,8 @@
}
else
{
- if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
-
+ if (trace) { log.trace(this + ": adding " + ref + " to memory"); }
+
try
{
synchronized (lock)
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -23,11 +23,12 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.impl.message.MessageFactory;
-import org.jboss.messaging.util.StreamUtils;
/**
* A MessageRequest
@@ -42,24 +43,32 @@
*/
class MessageRequest extends ClusterRequest
{
+ private static final byte NULL = 0;
+
+ private static final byte NOT_NULL = 1;
+
private String routingConditionText;
private Message message;
+ private Set queueNames;
+
MessageRequest()
{
}
- MessageRequest(String routingConditionText, Message message)
+ MessageRequest(String routingConditionText, Message message, Set queueNames)
{
this.routingConditionText = routingConditionText;
this.message = message;
+
+ this.queueNames = queueNames;
}
Object execute(RequestTarget office) throws Exception
{
- office.routeFromCluster(message, routingConditionText);
+ office.routeFromCluster(message, routingConditionText, queueNames);
return null;
}
@@ -77,9 +86,25 @@
message = MessageFactory.createMessage(type);
- message.read(in);
+ message.read(in);
+
+ byte b = in.readByte();
+
+ if (b != NULL)
+ {
+ int size = in.readInt();
+
+ queueNames = new HashSet(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ String queueName = in.readUTF();
+
+ queueNames.add(queueName);
+ }
+ }
}
-
+
public void write(DataOutputStream out) throws Exception
{
out.writeUTF(routingConditionText);
@@ -87,5 +112,25 @@
out.writeByte(message.getType());
message.write(out);
+
+ if (queueNames == null)
+ {
+ out.writeByte(NULL);
+ }
+ else
+ {
+ out.writeByte(NOT_NULL);
+
+ out.writeInt(queueNames.size());
+
+ Iterator iter = queueNames.iterator();
+
+ while (iter.hasNext())
+ {
+ String queueName = (String)iter.next();
+
+ out.writeUTF(queueName);
+ }
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -47,7 +47,6 @@
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
-import org.jboss.jms.server.JMSCondition;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.ClusterNotification;
@@ -71,6 +70,7 @@
import org.jboss.messaging.core.impl.MessagingQueue;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
+import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.StreamUtils;
import org.jgroups.Address;
@@ -436,7 +436,7 @@
throw new IllegalArgumentException("Condition is null");
}
- return routeInternal(ref, condition, tx, false);
+ return routeInternal(ref, condition, tx, false, null);
}
public Collection getQueuesForCondition(Condition condition, boolean localOnly) throws Exception
@@ -967,9 +967,9 @@
return true;
}
- public void routeFromCluster(Message message, String routingKeyText) throws Exception
+ public void routeFromCluster(Message message, String routingKeyText, Set queueNames) throws Exception
{
- if (trace) { log.trace(this + " routing from cluster " + message + ", routing key " + routingKeyText); }
+ if (trace) { log.trace(this + " routing from cluster " + message + ", routing key " + routingKeyText + ", queue names " + queueNames); }
Condition routingKey = conditionFactory.createCondition(routingKeyText);
@@ -979,7 +979,7 @@
{
ref = ms.reference(message);
- routeInternal(ref, routingKey, null, true);
+ routeInternal(ref, routingKey, null, true, queueNames);
}
finally
{
@@ -1463,7 +1463,7 @@
}
}
- private boolean routeInternal(MessageReference ref, Condition condition, Transaction tx, boolean fromCluster) throws Exception
+ private boolean routeInternal(MessageReference ref, Condition condition, Transaction tx, boolean fromCluster, Set names) throws Exception
{
if (trace) { log.trace(this + " routing " + ref + " with condition '" +
condition + "'" + (tx == null ? "" : " transactionally in " + tx) +
@@ -1503,8 +1503,30 @@
//When routing to a clustered temp queue, the queue is unreliable - but we always want to route to the local
//one so we need to add the check that we only route remotely if it's a topic
//We could do this better by making sure that only one queue with the same name is routed to on the cluster
- if (!fromCluster || (!queue.isRecoverable() && !((JMSCondition)condition).isQueue()))
+
+ boolean routeLocal = false;
+
+ if (!fromCluster)
{
+ //Same node
+ routeLocal = true;
+ }
+ else
+ {
+ //From the cluster
+ if (!queue.isRecoverable())
+ {
+ //When routing from the cluster we only route to non recoverable queues
+ //who haven't already been routed to on the sending node (same name)
+ if (names == null || !names.contains(queue.getName()))
+ {
+ routeLocal = true;
+ }
+ }
+ }
+
+ if (routeLocal)
+ {
//If we're not routing from the cluster OR the queue is unreliable then we consider it
//When we route from the cluster we never route to reliable queues
@@ -1554,33 +1576,7 @@
}
}
}
-
- if (remoteSet != null)
- {
- //There are queues on other nodes that want the message too
-
- //If the message is non reliable then we can unicast or multicast the message to the group so it
- //can get picked up by other nodes
-
- ClusterRequest request = new MessageRequest(condition.toText(), ref.getMessage());
-
- if (trace) { log.trace(this + " casting message to other node(s)"); }
-
- if (remoteSet.size() == 1)
- {
- //Only one node requires the message, so we can unicast
-
- unicastRequest(request, ((Integer)remoteSet.iterator().next()).intValue());
- }
- else
- {
- //More than one node requires the message - so we multicast
- //Whether it is reliable or unreliable multicast is determined by the JGroups stack config
-
- multicastRequest(request);
- }
- }
-
+
//If the ref is reliable and there is more than one reliable local queue that accepts the message then we need
//to route in a transaction to guarantee once and only once reliability guarantee
@@ -1599,6 +1595,8 @@
iter = targets.iterator();
+ Set queueNames = null;
+
while (iter.hasNext())
{
Queue queue = (Queue)iter.next();
@@ -1612,9 +1610,56 @@
if (del != null && del.isSelectorAccepted())
{
routed = true;
+
+ if (remoteSet != null)
+ {
+ if (queueNames == null)
+ {
+ queueNames = new HashSet();
+ }
+
+ //We put the queue name in a set - this is used on other nodes after routing from the cluster so it
+ //doesn't route to queues with the same name on other nodes
+ queueNames.add(queue.getName());
+ }
}
}
-
+
+ if (remoteSet != null)
+ {
+ //There are queues on other nodes that want the message too
+
+ //If the message is non reliable then we can unicast or multicast the message to the group so it
+ //can get picked up by other nodes
+
+ ClusterRequest request = new MessageRequest(condition.toText(), ref.getMessage(), queueNames);
+
+ if (trace) { log.trace(this + " casting message to other node(s)"); }
+
+ Integer nodeID = null;
+
+ if (remoteSet.size() == 1)
+ {
+ //Only one node requires the message, so we can unicast
+
+ nodeID = (Integer)remoteSet.iterator().next();
+ }
+
+ TxCallback callback = new CastMessageCallback(nodeID, request);
+
+ if (tx != null)
+ {
+ tx.addCallback(callback, this);
+ }
+ else
+ {
+ //Execute it now
+ callback.afterCommit(true);
+ }
+
+ routed = true;
+ }
+
if (startedTx)
{
if (trace) { log.trace(this + " committing " + tx); }
@@ -1632,7 +1677,7 @@
return routed;
}
-
+
private Binding removeBindingInMemory(int nodeID, String queueName) throws Exception
{
lock.writeLock().acquire();
@@ -2333,5 +2378,57 @@
}
// Inner classes --------------------------------------------------------------------------------
+
+ private class CastMessageCallback implements TxCallback
+ {
+ private Integer nodeID;
+
+ private ClusterRequest request;
+
+ CastMessageCallback(Integer nodeID, ClusterRequest request)
+ {
+ this.nodeID = nodeID;
+
+ this.request = request;
+ }
+ public void afterCommit(boolean onePhase) throws Exception
+ {
+ if (nodeID == null)
+ {
+ multicastRequest(request);
+ }
+ else
+ {
+ unicastRequest(request, nodeID.intValue());
+ }
+ }
+
+ public void afterPrepare() throws Exception
+ {
+ //NOOP
+ }
+
+ public void afterRollback(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforeCommit(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ //NOOP
+ }
+
+ public void beforeRollback(boolean onePhase) throws Exception
+ {
+ //NOOP
+ }
+
+ }
+
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.impl.postoffice;
import java.io.Serializable;
+import java.util.Set;
import org.jboss.messaging.core.contract.Message;
@@ -49,5 +50,5 @@
boolean removeReplicantLocally(int nodeId, Serializable key) throws Exception;
- void routeFromCluster(Message message, String routingKeyText) throws Exception;
+ void routeFromCluster(Message message, String routingKeyText, Set queueNames) throws Exception;
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-07-03 17:08:00 UTC (rev 2829)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.test.messaging.core.PostOfficeTestBase;
import org.jboss.test.messaging.core.SimpleCondition;
import org.jboss.test.messaging.core.SimpleFilter;
@@ -81,7 +82,7 @@
office3 = createClusteredPostOffice(3, "testgroup");
- Thread.sleep(2000);
+ Thread.sleep(1000);
Set nodes = office1.nodeIDView();
assertTrue(nodes.contains(new Integer(1)));
@@ -141,7 +142,7 @@
office3 = createClusteredPostOffice(3, "testgroup");
- Thread.sleep(2000);
+ Thread.sleep(1000);
Map failoverMap1 = office1.getFailoverMap();
@@ -199,7 +200,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
boolean added = office1.addBinding(new Binding(condition1, queue1, false), false);
assertTrue(added);
@@ -502,7 +503,7 @@
added = office2.addBinding(new Binding(condition1, queue9, false), false);
assertTrue(added);
- Condition condition2 = new SimpleCondition("topic2");
+ Condition condition2 = new SimpleCondition("condition2");
added = office2.addBinding(new Binding(condition2, queue10, false), false);
assertTrue(added);
@@ -583,61 +584,6 @@
}
}
- /*
- * Bind / Unbind all tests
- *
- * 1.
- * a) queue is not known by cluster
- * b) bind all
- * c) verify all nodes get queue
- * d) unbind - verify unbound from all nodes
- * e) close down all nodes
- * f) start all nodes
- * g) verify queue is not known
- *
- * 2.
- * a) queue is known by cluster
- * b) bind all
- * c) verify nothing changes on cluster
- *
- * 3
- * a) start one node
- * b) queue is not known to cluster
- * c) bind all
- * d) start other nodes
- * d) verify other nodes pick it up
- *
- * 4
- * a) start one node
- * b) queue is not known to cluster
- * c) bind all
- * d) shutdown all nodes
- * e) startup all nodes
- * f) verify queue is on all nodes
- *
- * 5
- * a) start one node
- * b) queue is not known
- * c) bind all
- * d) shutdown node
- * e) start other nodes
- * f) verify queue is not known
- * g) restart first node, verify queue is now known
- *
- * 6
- *
- * non durable bind all
- * a) bind all non durable
- * b) make sure is picked up by all nodes
- * c) close down all nodes
- * d) restart them all - make sure is not there
- * e) bind again
- * f) make sure is picked up
- * g) take down one node
- * h) bring it back up
- * i) make sure it has quuee again
- */
-
public void testBindUnbindAll1() throws Throwable
{
/*
@@ -665,7 +611,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -827,7 +773,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -941,7 +887,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1043,7 +989,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1152,7 +1098,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1271,7 +1217,7 @@
Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue1.activate();
- Condition condition1 = new SimpleCondition("topic1");
+ Condition condition1 = new SimpleCondition("condition1");
//Add all binding
boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1384,171 +1330,196 @@
}
}
- private void dumpNodeIDView(PostOffice postOffice)
+
+ public final void testClusteredRoutePersistent() throws Throwable
{
- Set view = postOffice.nodeIDView();
-
- log.info("=== node id view ==");
-
- Iterator iter = view.iterator();
-
- while (iter.hasNext())
- {
- log.info("Node:" + iter.next());
- }
-
- log.info("==================");
+ clusteredRoute(true);
}
- private void assertGotAll(int nodeId, Collection bindings, String queueName)
+ public final void testClusteredRouteNonPersistent() throws Throwable
{
-
- log.info("============= dumping bindings ========");
-
- Iterator iter = bindings.iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- log.info("Binding: " + binding);
- }
-
- log.info("========= end dump==========");
-
- assertEquals(3, bindings.size());
-
- iter = bindings.iterator();
-
- boolean got1 = false;
- boolean got2 = false;
- boolean got3 = false;
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- log.info("binding node id " + binding.queue.getNodeID());
-
- assertEquals(queueName, binding.queue.getName());
- if (binding.queue.getNodeID() == nodeId)
- {
- assertTrue(binding.allNodes);
- }
- else
- {
- assertFalse(binding.allNodes);
- }
-
- if (binding.queue.getNodeID() == 1)
- {
- got1 = true;
- }
- if (binding.queue.getNodeID() == 2)
- {
- got2 = true;
- }
- if (binding.queue.getNodeID() == 3)
- {
- got3 = true;
- }
- }
- assertTrue(got1 && got2 && got3);
+ clusteredRoute(false);
}
+
+ public void testClusteredRouteWithFilterNonPersistent() throws Throwable
+ {
+ this.clusteredRouteWithFilter(false);
+ }
+ public void testClusteredRouteWithFilterPersistent() throws Throwable
+ {
+ this.clusteredRouteWithFilter(true);
+ }
-
-//
-// public final void testClusteredRoutePersistent() throws Throwable
-// {
-// clusteredRoute(true);
-// }
-//
-// public final void testClusteredRouteNonPersistent() throws Throwable
-// {
-// clusteredRoute(false);
-// }
-//
-// public final void testClusteredTransactionalRoutePersistent() throws Throwable
-// {
-// clusteredTransactionalRoute(true);
-// }
-//
-// public final void testClusteredTransactionalRouteNonPersistent() throws Throwable
-// {
-// clusteredTransactionalRoute(false);
-// }
-//
-// public void testClusteredNonPersistentRouteWithFilterNonRecoverable() throws Throwable
-// {
-// this.clusteredRouteWithFilter(false, false);
-// }
-//
-// public void testClusteredPersistentRouteWithFilterNonRecoverable() throws Throwable
-// {
-// this.clusteredRouteWithFilter(true);
-// }
-//
-// public void testClusteredNonPersistentRouteWithFilterRecoverable() throws Throwable
-// {
-// this.clusteredRouteWithFilter(false, true);
-// }
-//
-// public void testClusteredPersistentRouteWithFilterRecoverable() throws Throwable
-// {
-// this.clusteredRouteWithFilter(true, true);
-// }
-//
-// public void testRouteSharedPointToPointQueuePersistentNonRecoverable() throws Throwable
-// {
-// this.routeSharedQueue(true);
-// }
-//
-// public void testRouteSharedPointToPointQueueNonPersistentNonRecoverable() throws Throwable
-// {
-// this.routeSharedQueue(false, false);
-// }
-//
-// public void testRouteSharedPointToPointQueuePersistentRecoverable() throws Throwable
-// {
-// this.routeSharedQueue(true, true);
-// }
-//
-// public void testRouteSharedPointToPointQueueNonPersistentRecoverable() throws Throwable
-// {
-// this.routeSharedQueue(false, true);
-// }
-//
-// public void testRouteComplexTopicPersistent() throws Throwable
-// {
-// this.routeComplexTopic(true);
-// }
-//
-// public void testRouteComplexTopicNonPersistent() throws Throwable
-// {
-// this.routeComplexTopic(false);
-// }
-//
-// public void testRouteLocalQueuesPersistentNonRecoverable() throws Throwable
-// {
-// this.routeLocalQueues(true);
-// }
-//
-// public void testRouteLocalQueuesNonPersistentNonRecoverable() throws Throwable
-// {
-// this.routeLocalQueues(false, false);
-// }
-//
-// public void testRouteLocalQueuesPersistentRecoverable() throws Throwable
-// {
-// this.routeLocalQueues(true, true);
-// }
-//
-// public void testRouteLocalQueuesNonPersistentRecoverable() throws Throwable
-// {
-// this.routeLocalQueues(false, true);
-// }
+ public void testRouteSharedQueuePersistent() throws Throwable
+ {
+ this.routeSharedQueue(true);
+ }
+ public void testRouteSharedQueueNonPersistent() throws Throwable
+ {
+ this.routeSharedQueue(false);
+ }
+ public void testClusteredTransactionalRoutePersistent() throws Throwable
+ {
+ this.clusteredTransactionalRoute(true);
+ }
+
+ public void testClusteredTransactionalRouteNonPersistent() throws Throwable
+ {
+ this.clusteredTransactionalRoute(false);
+ }
+
+ public void testClusteredRouteFourNodesPersistent() throws Throwable
+ {
+ this.clusteredRouteFourNodes(true);
+ }
+
+ public void testClusteredRouteFourNodesNonPersistent() throws Throwable
+ {
+ this.clusteredRouteFourNodes(false);
+ }
+
+ public void testStartTxInternally() throws Throwable
+ {
+ PostOffice office1 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ Queue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue1.activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+ assertTrue(added);
+
+ Queue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue2.activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+ assertTrue(added);
+
+ Queue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue3.activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+ assertTrue(added);
+
+ Queue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue4.activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue4, false), false);
+ assertTrue(added);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.getLocalDistributor().add(receiver4);
+
+ Message msg1 = CoreMessageFactory.createCoreMessage(1, true, null);
+ MessageReference ref1 = ms.reference(msg1);
+ boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Message msg2 = CoreMessageFactory.createCoreMessage(2, true, null);
+ MessageReference ref2 = ms.reference(msg2);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Message msg3 = CoreMessageFactory.createCoreMessage(3, true, null);
+ MessageReference ref3 = ms.reference(msg3);
+ routed = office1.route(ref3, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Thread.sleep(1000);
+
+ List msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ Message msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ Message msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ Message msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver1.acknowledge(msgRec1, null);
+ receiver1.acknowledge(msgRec2, null);
+ receiver1.acknowledge(msgRec3, null);
+ msgs = queue1.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver2.acknowledge(msgRec1, null);
+ receiver2.acknowledge(msgRec2, null);
+ receiver2.acknowledge(msgRec3, null);
+ msgs = queue2.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver3.acknowledge(msgRec1, null);
+ receiver3.acknowledge(msgRec2, null);
+ receiver3.acknowledge(msgRec3, null);
+ msgs = queue3.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver4.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver4.acknowledge(msgRec1, null);
+ receiver4.acknowledge(msgRec2, null);
+ receiver4.acknowledge(msgRec3, null);
+ msgs = queue4.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+ }
+ }
+
public void testBindSameName() throws Throwable
{
PostOffice office1 = null;
@@ -1614,1687 +1585,6 @@
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
-//
-// protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable)
-// throws Throwable
-// {
-// PostOffice office1 = null;
-//
-// PostOffice office2 = null;
-//
-// try
-// {
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//
-// SimpleFilter filter1 = new SimpleFilter(2);
-// SimpleFilter filter2 = new SimpleFilter(3);
-//
-// LocalClusteredQueue queue1 =
-// new LocalClusteredQueue(1, "queue1", channelIDManager.getID(), ms, pm,
-// true, recoverable, -1, filter1);
-//
-// office1.bindQueue(new SimpleCondition("topic1"), queue1);
-//
-// LocalClusteredQueue queue2 =
-// new LocalClusteredQueue(2, "queue2", channelIDManager.getID(), ms, pm,
-// true, recoverable, -1, filter2);
-//
-// office2.bindQueue(new SimpleCondition("topic1"), queue2);
-//
-// LocalClusteredQueue queue3 =
-// new LocalClusteredQueue(2, "queue3", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office2.bindQueue(new SimpleCondition("topic1"), queue3);
-//
-// SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-// queue1.add(receiver1);
-//
-// SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-// queue2.add(receiver2);
-//
-// SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-// queue3.add(receiver3);
-//
-// Message msg1 = CoreMessageFactory.createCoreMessage(1);
-// MessageReference ref1 = ms.reference(msg1);
-// boolean routed = office1.route(ref1, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-//
-// Message msg2 = CoreMessageFactory.createCoreMessage(2);
-// MessageReference ref2 = ms.reference(msg2);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-// Message msg3 = CoreMessageFactory.createCoreMessage(3);
-// MessageReference ref3 = ms.reference(msg3);
-// routed = office1.route(ref3, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-// Thread.sleep(2000);
-//
-// List msgs = receiver1.getMessages();
-// assertNotNull(msgs);
-// assertEquals(1, msgs.size());
-// Message msgRec = (Message)msgs.get(0);
-// assertTrue(msg2 == msgRec);
-// receiver1.acknowledge(msgRec, null);
-// msgs = queue1.browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-//
-// msgs = receiver2.getMessages();
-// assertNotNull(msgs);
-// assertEquals(1, msgs.size());
-// msgRec = (Message)msgs.get(0);
-// assertTrue(msg3 == msgRec);
-// receiver2.acknowledge(msgRec, null);
-// msgs = queue2.browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-//
-// msgs = receiver3.getMessages();
-// assertNotNull(msgs);
-// assertEquals(3, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertTrue(msg1 == msgRec1);
-// Message msgRec2 = (Message)msgs.get(1);
-// assertTrue(msg2 == msgRec2);
-// Message msgRec3 = (Message)msgs.get(2);
-// assertTrue(msg3 == msgRec3);
-//
-// receiver3.acknowledge(msgRec1, null);
-// receiver3.acknowledge(msgRec2, null);
-// receiver3.acknowledge(msgRec3, null);
-// msgs = queue3.browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-//
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// office2.stop();
-// }
-//
-// }
-// }
-//
-// protected void clusteredRoute(boolean persistentMessage) throws Throwable
-// {
-// PostOffice office1 = null;
-//
-// PostOffice office2 = null;
-//
-// try
-// {
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//
-// //Two topics with a mixture of durable and non durable subscriptions
-//
-// LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
-// Binding[] bindings = new Binding[16];
-//
-// queues[0] = new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[0] = office1.bindQueue(new SimpleCondition("topic1"), queues[0]);
-//
-// queues[1] = new LocalClusteredQueue(1, "sub2", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[1] = office1.bindQueue(new SimpleCondition("topic1"), queues[1]);
-//
-// queues[2] = new LocalClusteredQueue(2, "sub3", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[2] = office2.bindQueue(new SimpleCondition("topic1"), queues[2]);
-//
-// queues[3] = new LocalClusteredQueue(2, "sub4", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[3] = office2.bindQueue(new SimpleCondition("topic1"), queues[3]);
-//
-// queues[4] = new LocalClusteredQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[4] = office2.bindQueue(new SimpleCondition("topic1"), queues[4]);
-//
-// queues[5] = new LocalClusteredQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[5] = office1.bindQueue(new SimpleCondition("topic1"), queues[5]);
-//
-// queues[6] = new LocalClusteredQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[6] = office1.bindQueue(new SimpleCondition("topic1"), queues[6]);
-//
-// queues[7] = new LocalClusteredQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[7] = office1.bindQueue(new SimpleCondition("topic1"), queues[7]);
-//
-// queues[8] = new LocalClusteredQueue(1, "sub9", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[8] = office1.bindQueue(new SimpleCondition("topic2"), queues[8]);
-//
-// queues[9] = new LocalClusteredQueue(1, "sub10", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[9] = office1.bindQueue(new SimpleCondition("topic2"), queues[9]);
-//
-// queues[10] = new LocalClusteredQueue(2, "sub11", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[10] = office2.bindQueue(new SimpleCondition("topic2"), queues[10]);
-//
-// queues[11] = new LocalClusteredQueue(2, "sub12", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[11] = office2.bindQueue(new SimpleCondition("topic2"), queues[11]);
-//
-// queues[12] = new LocalClusteredQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[12] = office2.bindQueue(new SimpleCondition("topic2"), queues[12]);
-//
-// queues[13] = new LocalClusteredQueue(1, "sub14", channelIDManager.getID(), ms, pm, true, false, -1, null);
-// bindings[13] = office1.bindQueue(new SimpleCondition("topic2"), queues[13]);
-//
-// queues[14] = new LocalClusteredQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[14] = office1.bindQueue(new SimpleCondition("topic2"), queues[14]);
-//
-// queues[15] = new LocalClusteredQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, true, -1, null);
-// bindings[15] = office1.bindQueue(new SimpleCondition("topic2"), queues[15]);
-//
-// SimpleReceiver[] receivers = new SimpleReceiver[16];
-//
-// for (int i = 0; i < 16; i++)
-// {
-// receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queues[i].add(receivers[i]);
-// }
-//
-// Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// MessageReference ref = ms.reference(msg);
-//
-// boolean routed = office1.route(ref, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(1, msgs.size());
-// Message msgRec = (Message)msgs.get(0);
-// assertEquals(msg.getMessageID(), msgRec.getMessageID());
-// receivers[i].acknowledge(msgRec, null);
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// receivers[i].clear();
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// //Now route to topic2
-//
-// msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);;
-// ref = ms.reference(msg);
-//
-// routed = office1.route(ref, new SimpleCondition("topic2"), null);
-// assertTrue(routed);
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(1, msgs.size());
-// Message msgRec = (Message)msgs.get(0);
-// assertEquals(msg.getMessageID(), msgRec.getMessageID());
-// receivers[i].acknowledge(msgRec, null);
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// receivers[i].clear();
-// }
-//
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-//
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// try
-// {
-// office1.unbindQueue("sub7");
-// office1.unbindQueue("sub8");
-// office1.unbindQueue("sub15");
-// office1.unbindQueue("sub16");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-//
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// try
-// {
-// office2.unbindQueue("sub5");
-// office2.unbindQueue("sub13");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office2.stop();
-// }
-//
-// }
-// }
-//
-// protected void routeSharedQueue(boolean persistentMessage, boolean recoverable) throws Throwable
-// {
-// PostOffice office1 = null;
-//
-// PostOffice office2 = null;
-//
-// PostOffice office3 = null;
-//
-// PostOffice office4 = null;
-//
-// PostOffice office5 = null;
-//
-// PostOffice office6 = null;
-//
-// try
-// {
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-// office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-// office4 = createPostOffice(4, "testgroup", sc, ms, pm, tr);
-// office5 = createPostOffice(5, "testgroup", sc, ms, pm, tr);
-// office6 = createPostOffice(6, "testgroup", sc, ms, pm, tr);
-//
-// // We deploy the queue on nodes 1, 2, 3, 4 and 5
-// // We don't deploy on node 6
-//
-// LocalClusteredQueue queue1 =
-// new LocalClusteredQueue(1, "queue1", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office1.bindQueue(new SimpleCondition("queue1"), queue1);
-//
-// SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queue1.add(receiver1);
-//
-// LocalClusteredQueue queue2 =
-// new LocalClusteredQueue(2, "queue1", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office2.bindQueue(new SimpleCondition("queue1"), queue2);
-//
-// SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queue2.add(receiver2);
-//
-// LocalClusteredQueue queue3 =
-// new LocalClusteredQueue(3, "queue1", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office3.bindQueue(new SimpleCondition("queue1"), queue3);
-//
-// SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queue3.add(receiver3);
-//
-// LocalClusteredQueue queue4 =
-// new LocalClusteredQueue(4, "queue1", channelIDManager.getID(), ms, pm,
-// true, recoverable, -1, null);
-//
-// office4.bindQueue(new SimpleCondition("queue1"), queue4);
-//
-// SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queue4.add(receiver4);
-//
-// LocalClusteredQueue queue5 =
-// new LocalClusteredQueue(5, "queue1", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office5.bindQueue(new SimpleCondition("queue1"), queue5);
-// SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queue5.add(receiver5);
-//
-// // We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the
-// // message if the filter matches
-//
-// Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// MessageReference ref = ms.reference(msg);
-// boolean routed = office1.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-// checkContainsAndAcknowledge(msg, receiver1, queue1);
-// this.checkEmpty(receiver2);
-// this.checkEmpty(receiver3);
-// this.checkEmpty(receiver4);
-// this.checkEmpty(receiver5);
-//
-// msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office2.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-// this.checkEmpty(receiver1);
-// checkContainsAndAcknowledge(msg, receiver2, queue2);
-// this.checkEmpty(receiver3);
-// this.checkEmpty(receiver4);
-// this.checkEmpty(receiver5);
-//
-// msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office3.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-// this.checkEmpty(receiver1);
-// this.checkEmpty(receiver2);
-// checkContainsAndAcknowledge(msg, receiver3, queue3);
-// this.checkEmpty(receiver4);
-// this.checkEmpty(receiver5);
-//
-// msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office4.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-// this.checkEmpty(receiver1);
-// this.checkEmpty(receiver2);
-// this.checkEmpty(receiver3);
-// checkContainsAndAcknowledge(msg, receiver4, queue3);
-// this.checkEmpty(receiver5);
-//
-// msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office5.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-// this.checkEmpty(receiver1);
-// this.checkEmpty(receiver2);
-// this.checkEmpty(receiver3);
-// this.checkEmpty(receiver4);
-// checkContainsAndAcknowledge(msg, receiver5, queue5);
-//
-// msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office6.route(ref, new SimpleCondition("queue1"), null);
-// assertTrue(routed);
-//
-// //The actual queue that receives the mesage is determined by the routing policy
-// //The default uses round robin for the nodes (this is tested more thoroughly in
-// //its own test)
-//
-// Thread.sleep(1000);
-//
-// checkContainsAndAcknowledge(msg, receiver1, queue1);
-// this.checkEmpty(receiver1);
-// this.checkEmpty(receiver2);
-// this.checkEmpty(receiver3);
-// this.checkEmpty(receiver4);
-// this.checkEmpty(receiver5);
-//
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// office2.stop();
-// }
-//
-// if (office3 != null)
-// {
-// office3.stop();
-// }
-//
-// if (office4 != null)
-// {
-// office4.stop();
-// }
-//
-// if (office5 != null)
-// {
-// office5.stop();
-// }
-//
-// if (office6 != null)
-// {
-// office6.stop();
-// }
-//
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-// }
-// }
-//
-//
-// /**
-// * Clustered post offices should be able to have local queues bound to them too.
-// */
-// protected void routeLocalQueues(boolean persistentMessage, boolean recoverable) throws Throwable
-// {
-// PostOffice office1 = null;
-// PostOffice office2 = null;
-// PostOffice office3 = null;
-//
-// try
-// {
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-// office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-//
-// LocalClusteredQueue sub1 =
-// new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office1.bindQueue(new SimpleCondition("topic"), sub1);
-//
-// SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sub1.add(receiver1);
-//
-// LocalClusteredQueue sub2 =
-// new LocalClusteredQueue(2, "sub2", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office2.bindQueue(new SimpleCondition("topic"), sub2);
-// SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sub2.add(receiver2);
-//
-// LocalClusteredQueue sub3 =
-// new LocalClusteredQueue(3, "sub3", channelIDManager.getID(), ms, pm, true,
-// recoverable, -1, null);
-//
-// office3.bindQueue(new SimpleCondition("topic"), sub3);
-// SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sub3.add(receiver3);
-//
-// //Only the local sub should get it since we have bound locally
-//
-// Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
-// MessageReference ref = ms.reference(msg);
-// boolean routed = office1.route(ref, new SimpleCondition("topic"), null);
-// assertTrue(routed);
-// Thread.sleep(500);
-// checkContainsAndAcknowledge(msg, receiver1, sub1);
-// this.checkEmpty(receiver2);
-// this.checkEmpty(receiver3);
-//
-// msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office2.route(ref, new SimpleCondition("topic"), null);
-// assertTrue(routed);
-// Thread.sleep(500);
-// this.checkEmpty(receiver1);
-// checkContainsAndAcknowledge(msg, receiver2, sub2);
-// this.checkEmpty(receiver3);
-//
-// msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);
-// ref = ms.reference(msg);
-// routed = office3.route(ref, new SimpleCondition("topic"), null);
-// assertTrue(routed);
-// Thread.sleep(500);
-// this.checkEmpty(receiver1);
-// this.checkEmpty(receiver2);
-// checkContainsAndAcknowledge(msg, receiver3, sub2);
-//
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-//
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// office2.stop();
-// }
-//
-// if (office3 != null)
-// {
-// office3.stop();
-// }
-//
-// }
-// }
-//
-//
-//
-// /**
-// * We set up a complex scenario with multiple subscriptions, shared and unshared on different
-// * nodes.
-// *
-// * node1: no subscriptions
-// * node2: 2 non durable
-// * node3: 1 non shared durable, 1 non durable
-// * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
-// * node5: 2 shared durable (shared1 and shared2)
-// * node6: 1 shared durable (shared2), 1 non durable
-// * node7: 1 shared durable (shared2)
-// *
-// * Then we send mess
-// */
-// protected void routeComplexTopic(boolean persistent) throws Throwable
-// {
-// PostOffice office1 = null;
-// PostOffice office2 = null;
-// PostOffice office3 = null;
-// PostOffice office4 = null;
-// PostOffice office5 = null;
-// PostOffice office6 = null;
-// PostOffice office7 = null;
-//
-// try
-// {
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-// office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-// office4 = createPostOffice(4, "testgroup", sc, ms, pm, tr);
-// office5 = createPostOffice(5, "testgroup", sc, ms, pm, tr);
-// office6 = createPostOffice(6, "testgroup", sc, ms, pm, tr);
-// office7 = createPostOffice(7, "testgroup", sc, ms, pm, tr);
-//
-// //Node 2
-// //======
-//
-// // Non durable 1 on node 2
-// LocalClusteredQueue nonDurable1 =
-// new LocalClusteredQueue(2, "nondurable1", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-//
-// office2.bindQueue(new SimpleCondition("topic"), nonDurable1);
-// SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable1.add(receiver1);
-//
-// // Non durable 2 on node 2
-// LocalClusteredQueue nonDurable2 =
-// new LocalClusteredQueue(2, "nondurable2", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-//
-// office2.bindQueue(new SimpleCondition("topic"), nonDurable2);
-// SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable2.add(receiver2);
-//
-// //Node 3
-// //======
-//
-// // Non shared durable
-// LocalClusteredQueue nonSharedDurable1 =
-// new LocalClusteredQueue(3, "nonshareddurable1", channelIDManager.getID(), ms,
-// pm, true, true, -1, null);
-//
-// office3.bindQueue(new SimpleCondition("topic"), nonSharedDurable1);
-// SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonSharedDurable1.add(receiver3);
-//
-// // Non durable
-// LocalClusteredQueue nonDurable3 =
-// new LocalClusteredQueue(3, "nondurable3", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-//
-// office3.bindQueue(new SimpleCondition("topic"), nonDurable3);
-// SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable3.add(receiver4);
-//
-// //Node 4
-// //======
-//
-// // Shared durable
-// LocalClusteredQueue sharedDurable1 =
-// new LocalClusteredQueue(4, "shareddurable1", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-//
-// office4.bindQueue(new SimpleCondition("topic"), sharedDurable1);
-// SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sharedDurable1.add(receiver5);
-//
-// // Non shared durable
-// LocalClusteredQueue nonSharedDurable2 =
-// new LocalClusteredQueue(4, "nonshareddurable2", channelIDManager.getID(), ms,
-// pm, true, true, -1, null);
-//
-// office4.bindQueue(new SimpleCondition("topic"), nonSharedDurable2);
-// SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonSharedDurable2.add(receiver6);
-//
-// // Non durable
-// LocalClusteredQueue nonDurable4 =
-// new LocalClusteredQueue(4, "nondurable4", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-//
-// office4.bindQueue(new SimpleCondition("topic"), nonDurable4);
-// SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable4.add(receiver7);
-//
-// // Non durable
-// LocalClusteredQueue nonDurable5 =
-// new LocalClusteredQueue(4, "nondurable5", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// office4.bindQueue(new SimpleCondition("topic"), nonDurable5);
-// SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable5.add(receiver8);
-//
-// // Non durable
-// LocalClusteredQueue nonDurable6 =
-// new LocalClusteredQueue(4, "nondurable6", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// office4.bindQueue(new SimpleCondition("topic"), nonDurable6);
-// SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable6.add(receiver9);
-//
-// // Node 5
-// //=======
-// // Shared durable
-// LocalClusteredQueue sharedDurable2 =
-// new LocalClusteredQueue(5, "shareddurable1", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-//
-// office5.bindQueue(new SimpleCondition("topic"), sharedDurable2);
-// SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sharedDurable2.add(receiver10);
-//
-// // Shared durable
-// LocalClusteredQueue sharedDurable3 =
-// new LocalClusteredQueue(5, "shareddurable2", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-//
-// office5.bindQueue(new SimpleCondition("topic"), sharedDurable3);
-// SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sharedDurable3.add(receiver11);
-//
-// // Node 6
-// //=========
-// LocalClusteredQueue sharedDurable4 =
-// new LocalClusteredQueue(6, "shareddurable2", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-//
-// office6.bindQueue(new SimpleCondition("topic"), sharedDurable4);
-// SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sharedDurable4.add(receiver12);
-//
-// LocalClusteredQueue nonDurable7 =
-// new LocalClusteredQueue(6, "nondurable7", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// office6.bindQueue(new SimpleCondition("topic"), nonDurable7);
-// SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// nonDurable7.add(receiver13);
-//
-// //Node 7
-// //=======
-// LocalClusteredQueue sharedDurable5 =
-// new LocalClusteredQueue(7, "shareddurable2", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-//
-// office7.bindQueue(new SimpleCondition("topic"), sharedDurable5);
-// SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// sharedDurable5.add(receiver14);
-//
-//
-// //Send 1 message at node1
-// //========================
-//
-// List msgs = sendMessages("topic", persistent, null, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10);
-// checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//
-// //n6
-// checkEmpty(receiver12);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver14);
-//
-//
-// //Send 1 message at node2
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office2, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10);
-// checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//
-// //n6
-// checkEmpty(receiver12);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver14);
-//
-// //Send 1 message at node3
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office3, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10);
-// checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//
-// //n6
-// checkEmpty(receiver12);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver14);
-//
-// //Send 1 message at node4
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office4, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1); // shared durable 1
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10); //shared durable 1
-// checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3); //shared durable 2
-//
-// //n6
-// checkEmpty(receiver12); // shared durable 2
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver14);
-//
-// //Send 1 message at node5
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office5, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkEmpty(receiver5);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkContainsAndAcknowledge(msgs, receiver10, sharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//
-// //n6
-// checkEmpty(receiver12);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver12);
-//
-// //Send 1 message at node6
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office6, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10);
-//
-// checkEmpty(receiver11);
-//
-// //n6
-// checkContainsAndAcknowledge(msgs, receiver12, sharedDurable4);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkEmpty(receiver12);
-//
-// //Send 1 message at node7
-// //========================
-//
-// msgs = sendMessages("topic", persistent, office7, 1, null);
-//
-// //n2
-// checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-// checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//
-// //n3
-// checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//
-// //n4
-// checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-// checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-// checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-// checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-// checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//
-// //n5
-// checkEmpty(receiver10);
-// checkEmpty(receiver11);
-//
-// //n6
-// checkEmpty(receiver12);
-// checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//
-// //n7
-// checkContainsAndAcknowledge(msgs, receiver14, sharedDurable5);
-//
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// office2.stop();
-// }
-//
-// if (office3 != null)
-// {
-// try
-// {
-// office3.unbindQueue("nonshareddurable1");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office3.stop();
-// }
-//
-// if (office4 != null)
-// {
-// try
-// {
-// office4.unbindQueue("shareddurable1");
-// office4.unbindQueue("nonshareddurable2");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office4.stop();
-// }
-//
-// if (office5 != null)
-// {
-// try
-// {
-// office5.unbindQueue("shareddurable1");
-// office5.unbindQueue("shareddurable2");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office5.stop();
-// }
-//
-// if (office6 != null)
-// {
-// try
-// {
-// office6.unbindQueue("shareddurable2");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office6.stop();
-// }
-//
-// if (office7 != null)
-// {
-// try
-// {
-// office7.unbindQueue("shareddurable2");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-// office7.stop();
-// }
-//
-// }
-// }
-//
-//
-//
-//
-//
-// protected void clusteredTransactionalRoute(boolean persistent) throws Throwable
-// {
-// PostOffice office1 = null;
-//
-// PostOffice office2 = null;
-//
-// try
-// {
-// //Start two offices
-//
-// office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-// office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//
-// LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
-// Binding[] bindings = new Binding[16];
-//
-// queues[0] =
-// new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[0] = office1.bindQueue(new SimpleCondition("topic1"), queues[0]);
-//
-// queues[1] =
-// new LocalClusteredQueue(1, "sub2", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[1] = office1.bindQueue(new SimpleCondition("topic1"), queues[1]);
-//
-// queues[2] =
-// new LocalClusteredQueue(2, "sub3", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[2] = office2.bindQueue(new SimpleCondition("topic1"), queues[2]);
-//
-// queues[3] =
-// new LocalClusteredQueue(2, "sub4", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[3] = office2.bindQueue(new SimpleCondition("topic1"), queues[3]);
-//
-// queues[4] =
-// new LocalClusteredQueue(2, "sub5", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[4] = office2.bindQueue(new SimpleCondition("topic1"), queues[4]);
-//
-// queues[5] =
-// new LocalClusteredQueue(1, "sub6", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[5] = office1.bindQueue(new SimpleCondition("topic1"), queues[5]);
-//
-// queues[6] =
-// new LocalClusteredQueue(1, "sub7", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[6] = office1.bindQueue(new SimpleCondition("topic1"), queues[6]);
-//
-// queues[7] =
-// new LocalClusteredQueue(1, "sub8", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[7] = office1.bindQueue(new SimpleCondition("topic1"), queues[7]);
-//
-// queues[8] =
-// new LocalClusteredQueue(1, "sub9", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[8] = office1.bindQueue(new SimpleCondition("topic2"), queues[8]);
-//
-// queues[9] =
-// new LocalClusteredQueue(1, "sub10", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[9] = office1.bindQueue(new SimpleCondition("topic2"), queues[9]);
-//
-// queues[10] =
-// new LocalClusteredQueue(2, "sub11", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[10] = office2.bindQueue(new SimpleCondition("topic2"), queues[10]);
-//
-// queues[11] =
-// new LocalClusteredQueue(2, "sub12", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[11] = office2.bindQueue(new SimpleCondition("topic2"), queues[11]);
-//
-// queues[12] =
-// new LocalClusteredQueue(2, "sub13", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[12] = office2.bindQueue(new SimpleCondition("topic2"), queues[12]);
-//
-// queues[13] =
-// new LocalClusteredQueue(1, "sub14", channelIDManager.getID(), ms, pm,
-// true, false, -1, null);
-// bindings[13] = office1.bindQueue(new SimpleCondition("topic2"), queues[13]);
-//
-// queues[14] =
-// new LocalClusteredQueue(1, "sub15", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[14] = office1.bindQueue(new SimpleCondition("topic2"), queues[14]);
-//
-// queues[15] =
-// new LocalClusteredQueue(1, "sub16", channelIDManager.getID(), ms, pm,
-// true, true, -1, null);
-// bindings[15] = office1.bindQueue(new SimpleCondition("topic2"), queues[15]);
-//
-// SimpleReceiver[] receivers = new SimpleReceiver[16];
-//
-// for (int i = 0; i < 16; i++)
-// {
-// receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-// queues[i].add(receivers[i]);
-// }
-//
-// //First for topic 1
-//
-// Message msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
-// MessageReference ref1 = ms.reference(msg1);
-//
-// Message msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
-// MessageReference ref2 = ms.reference(msg2);
-//
-// Transaction tx = tr.createTransaction();
-//
-// boolean routed = office1.route(ref1, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-//
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.commit();
-//
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-// receivers[i].acknowledge(msgRec1, null);
-// receivers[i].acknowledge(msgRec2, null);
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// receivers[i].clear();
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
-// ref2 = ms.reference(msg2);
-//
-// tx = tr.createTransaction();
-//
-// routed = office1.route(ref1, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-//
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.rollback();
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// //Now send some non transactionally
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);
-// ref2 = ms.reference(msg2);
-//
-// routed = office1.route(ref1, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// //And acknowledge transactionally
-//
-// tx = tr.createTransaction();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-// receivers[i].acknowledge(msgRec1, tx);
-// receivers[i].acknowledge(msgRec2, tx);
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-//
-// receivers[i].clear();
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.commit();
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-//
-// // and the rollback
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
-// ref2 = ms.reference(msg2);
-//
-// routed = office1.route(ref1, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), null);
-// assertTrue(routed);
-//
-// Thread.sleep(1000);
-//
-// tx = tr.createTransaction();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-// receivers[i].acknowledge(msgRec1, tx);
-// receivers[i].acknowledge(msgRec2, tx);
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-//
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.rollback();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-//
-// receivers[i].acknowledge(msgRec1, null);
-// receivers[i].acknowledge(msgRec2, null);
-//
-// receivers[i].clear();
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-//
-// // Now for topic 2
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);
-// ref2 = ms.reference(msg2);
-//
-// tx = tr.createTransaction();
-//
-// routed = office1.route(ref1, new SimpleCondition("topic2"), tx);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic2"), tx);
-// assertTrue(routed);
-//
-//
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.commit();
-//
-// //Messages are sent asych so may take some finite time to arrive
-// Thread.sleep(1000);
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-// receivers[i].acknowledge(msgRec1, null);
-// receivers[i].acknowledge(msgRec2, null);
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// receivers[i].clear();
-// }
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
-// ref2 = ms.reference(msg2);
-//
-// tx = tr.createTransaction();
-//
-// routed = office1.route(ref1, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic1"), tx);
-// assertTrue(routed);
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// tx.rollback();
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// //Now send some non transactionally
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);
-// ref2 = ms.reference(msg2);
-//
-// routed = office1.route(ref1, new SimpleCondition("topic2"), null);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic2"), null);
-// assertTrue(routed);
-//
-// Thread.sleep(1000);
-//
-// //And acknowledge transactionally
-//
-// tx = tr.createTransaction();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-// receivers[i].acknowledge(msgRec1, tx);
-// receivers[i].acknowledge(msgRec2, tx);
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-//
-// receivers[i].clear();
-// }
-//
-//
-//
-// tx.commit();
-//
-// for (int i = 0; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-//
-// // and the rollback
-//
-// msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
-// ref1 = ms.reference(msg1);
-//
-// msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
-// ref2 = ms.reference(msg2);
-//
-// routed = office1.route(ref1, new SimpleCondition("topic2"), null);
-// assertTrue(routed);
-// routed = office1.route(ref2, new SimpleCondition("topic2"), null);
-// assertTrue(routed);
-//
-// Thread.sleep(1000);
-//
-// tx = tr.createTransaction();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-//
-// receivers[i].acknowledge(msgRec1, tx);
-// receivers[i].acknowledge(msgRec2, tx);
-//
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-// }
-//
-//
-//
-// tx.rollback();
-//
-// for (int i = 0; i < 8; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// msgs = queues[i].browse();
-// assertNotNull(msgs);
-// assertTrue(msgs.isEmpty());
-// }
-//
-// for (int i = 8; i < 16; i++)
-// {
-// List msgs = receivers[i].getMessages();
-// assertNotNull(msgs);
-// assertEquals(2, msgs.size());
-// Message msgRec1 = (Message)msgs.get(0);
-// assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-// Message msgRec2 = (Message)msgs.get(1);
-// assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
-//
-// int deliveringCount = queues[i].getDeliveringCount();
-//
-// assertEquals(2, deliveringCount);
-//
-// receivers[i].acknowledge(msgRec1, null);
-// receivers[i].acknowledge(msgRec2, null);
-//
-// receivers[i].clear();
-// }
-// if (checkNoMessageData())
-// {
-// fail("Message data still in database");
-// }
-// }
-// finally
-// {
-// if (office1 != null)
-// {
-// try
-// {
-// office1.unbindQueue("sub7");
-// office1.unbindQueue("sub8");
-// office1.unbindQueue("sub15");
-// office1.unbindQueue("sub16");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-//
-// office1.stop();
-// }
-//
-// if (office2 != null)
-// {
-// try
-// {
-// office2.unbindQueue("sub5");
-// office2.unbindQueue("sub13");
-// }
-// catch (Exception ignore)
-// {
-// ignore.printStackTrace();
-// }
-//
-// office2.stop();
-// }
-// }
-// }
protected void setUp() throws Exception
{
@@ -3308,6 +1598,1251 @@
// Private --------------------------------------------------------------------------------------
+ private void clusteredTransactionalRoute(boolean persistent) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ try
+ {
+ //Start two offices
+
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ Queue[] queues = new Queue[16];
+
+ //condition1
+
+ queues[0] = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[0].activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[0], false), false);
+ assertTrue(added);
+
+ queues[1] = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[1].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[1], false), false);
+ assertTrue(added);
+
+ queues[2] = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[2].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[2], false), false);
+ assertTrue(added);
+
+ queues[3] = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[3].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[3], false), false);
+ assertTrue(added);
+
+ queues[4] = new MessagingQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[4].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[4], false), false);
+ assertTrue(added);
+
+ queues[5] = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[5].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[5], false), false);
+ assertTrue(added);
+
+ queues[6] = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[6].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[6], false), false);
+ assertTrue(added);
+
+ queues[7] = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[7].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[7], false), false);
+ assertTrue(added);
+
+ //condition2
+
+ queues[8] = new MessagingQueue(1, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[8].activate();
+ added= office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[8], false), false);
+ assertTrue(added);
+
+ queues[9] = new MessagingQueue(1, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[9].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[9], false), false);
+ assertTrue(added);
+
+ queues[10] = new MessagingQueue(2, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[10].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[10], false), false);
+ assertTrue(added);
+
+ queues[11] = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[11].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[11], false), false);
+ assertTrue(added);
+
+ queues[12] = new MessagingQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[12].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[12], false), false);
+ assertTrue(added);
+
+ queues[13] = new MessagingQueue(1, "sub14", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[13].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[13], false), false);
+ assertTrue(added);
+
+ queues[14] = new MessagingQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[14].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[14], false), false);
+ assertTrue(added);
+
+ queues[15] = new MessagingQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[15].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[15], false), false);
+ assertTrue(added);
+
+ SimpleReceiver[] receivers = new SimpleReceiver[16];
+
+ for (int i = 0; i < 16; i++)
+ {
+ receivers[i] = new SimpleReceiver("blah" + i, SimpleReceiver.ACCEPTING);
+ queues[i].getLocalDistributor().add(receivers[i]);
+ }
+
+ //First for topic 1
+
+ Message msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
+ MessageReference ref1 = ms.reference(msg1);
+
+ Message msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
+ MessageReference ref2 = ms.reference(msg2);
+
+ Transaction tx = tr.createTransaction();
+
+ boolean routed = office1.route(ref1, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+
+ for (int i = 0; i < 16; i++)
+ {
+ log.info("i is " + i);
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ log.info("committing");
+ tx.commit();
+ log.info("committed");
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ for (int i = 0; i < 16; i++)
+ {
+ if (i >= 8 || (queues[i].getNodeID() == 2 && queues[i].isRecoverable()))
+ {
+ //Shouldn't get message
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+ else
+ {
+ //Should get message
+ log.info("is is " + i);
+ log.info("trying with receiver " + receivers[i]);
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ Message msgRec1 = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
+ Message msgRec2 = (Message)msgs.get(1);
+ assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+ receivers[i].acknowledge(msgRec1, null);
+ receivers[i].acknowledge(msgRec2, null);
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receivers[i].clear();
+ }
+ }
+
+ msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
+ ref1 = ms.reference(msg1);
+
+ msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
+ ref2 = ms.reference(msg2);
+
+ tx = tr.createTransaction();
+
+ routed = office1.route(ref1, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ for (int i = 0; i < 16; i++)
+ {
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ tx.rollback();
+
+ for (int i = 0; i < 16; i++)
+ {
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+
+ // Now for topic 2
+
+ msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);
+ ref1 = ms.reference(msg1);
+
+ msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);
+ ref2 = ms.reference(msg2);
+
+ tx = tr.createTransaction();
+
+ routed = office2.route(ref1, new SimpleCondition("condition2"), tx);
+ assertTrue(routed);
+ routed = office2.route(ref2, new SimpleCondition("condition2"), tx);
+ assertTrue(routed);
+
+ for (int i = 0; i < 16; i++)
+ {
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ tx.commit();
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ for (int i = 8; i < 16; i++)
+ {
+ if (i < 8 || (queues[i].getNodeID() == 1 && queues[i].isRecoverable()))
+ {
+ // Shouldn't get message
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+ else
+ {
+ // Should get message
+ log.info("is is " + i);
+ log.info("trying with receiver " + receivers[i]);
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ Message msgRec1 = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
+ Message msgRec2 = (Message)msgs.get(1);
+ assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+ receivers[i].acknowledge(msgRec1, null);
+ receivers[i].acknowledge(msgRec2, null);
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receivers[i].clear();
+ }
+ }
+
+ msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;
+ ref1 = ms.reference(msg1);
+
+ msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;
+ ref2 = ms.reference(msg2);
+
+ tx = tr.createTransaction();
+
+ routed = office1.route(ref1, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), tx);
+ assertTrue(routed);
+
+ for (int i = 0; i < 16; i++)
+ {
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ tx.rollback();
+
+ for (int i = 0; i < 16; i++)
+ {
+ List msgs = receivers[i].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[i].browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.removeBinding("sub7", false);
+ office1.removeBinding("sub8", false);
+ office1.removeBinding("sub15", false);
+ office1.removeBinding("sub16", false);
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.removeBinding("sub5", false);
+ office2.removeBinding("sub13", false);
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+
+ office2.stop();
+ }
+ }
+ }
+
+ private void clusteredRouteWithFilter(boolean persistentMessage) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ SimpleFilter filter1 = new SimpleFilter(2);
+ SimpleFilter filter2 = new SimpleFilter(3);
+
+ Queue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter1, true);
+ queue1.activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+ assertTrue(added);
+
+ Queue queue2 = new MessagingQueue(2, "queue2", channelIDManager.getID(), ms, pm, false, -1, filter2, true);
+ queue2.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+ assertTrue(added);
+
+ Queue queue3 = new MessagingQueue(2, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue3.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+ assertTrue(added);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ Message msg1 = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref1 = ms.reference(msg1);
+ boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+
+ Message msg2 = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);
+ MessageReference ref2 = ms.reference(msg2);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Message msg3 = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);
+ MessageReference ref3 = ms.reference(msg3);
+ routed = office1.route(ref3, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Thread.sleep(1000);
+
+ List msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertTrue(msg2 == msgRec);
+ receiver1.acknowledge(msgRec, null);
+ msgs = queue1.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertTrue(msg3 == msgRec);
+ receiver2.acknowledge(msgRec, null);
+ msgs = queue2.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ Message msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ Message msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ Message msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver3.acknowledge(msgRec1, null);
+ receiver3.acknowledge(msgRec2, null);
+ receiver3.acknowledge(msgRec3, null);
+ msgs = queue3.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ }
+ }
+
+ private void clusteredRouteFourNodes(boolean persistentMessage) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ PostOffice office3 = null;
+
+ PostOffice office4 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+ office4 = createClusteredPostOffice(4, "testgroup");
+
+ Queue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue1.activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+ assertTrue(added);
+
+ Queue queue2 = new MessagingQueue(2, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue2.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+ assertTrue(added);
+
+ Queue queue3 = new MessagingQueue(3, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue3.activate();
+ added = office3.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+ assertTrue(added);
+
+ Queue queue4 = new MessagingQueue(4, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue4.activate();
+ added = office4.addBinding(new Binding(new SimpleCondition("condition1"), queue4, false), false);
+ assertTrue(added);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue4.getLocalDistributor().add(receiver4);
+
+ Message msg1 = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref1 = ms.reference(msg1);
+ boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Message msg2 = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);
+ MessageReference ref2 = ms.reference(msg2);
+ routed = office1.route(ref2, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Message msg3 = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);
+ MessageReference ref3 = ms.reference(msg3);
+ routed = office1.route(ref3, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ Thread.sleep(1000);
+
+ List msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ Message msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ Message msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ Message msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver1.acknowledge(msgRec1, null);
+ receiver1.acknowledge(msgRec2, null);
+ receiver1.acknowledge(msgRec3, null);
+ msgs = queue1.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver2.acknowledge(msgRec1, null);
+ receiver2.acknowledge(msgRec2, null);
+ receiver2.acknowledge(msgRec3, null);
+ msgs = queue2.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver3.acknowledge(msgRec1, null);
+ receiver3.acknowledge(msgRec2, null);
+ receiver3.acknowledge(msgRec3, null);
+ msgs = queue3.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver4.getMessages();
+ assertNotNull(msgs);
+ assertEquals(3, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec1);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msg2 == msgRec2);
+ msgRec3 = (Message)msgs.get(2);
+ assertTrue(msg3 == msgRec3);
+
+ receiver4.acknowledge(msgRec1, null);
+ receiver4.acknowledge(msgRec2, null);
+ receiver4.acknowledge(msgRec3, null);
+ msgs = queue4.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ }
+ }
+
+
+
+ private void clusteredRoute(boolean persistentMessage) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ //A mixture of durable and non durable queues
+
+ Queue[] queues = new Queue[16];
+
+ //condition1
+
+ queues[0] = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[0].activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[0], false), false);
+ assertTrue(added);
+
+ queues[1] = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[1].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[1], false), false);
+ assertTrue(added);
+
+ queues[2] = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[2].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[2], false), false);
+ assertTrue(added);
+
+ queues[3] = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[3].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[3], false), false);
+ assertTrue(added);
+
+ //durable
+
+ queues[4] = new MessagingQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[4].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[4], false), false);
+ assertTrue(added);
+
+ queues[5] = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[5].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[5], false), false);
+ assertTrue(added);
+
+ //durable
+
+ queues[6] = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[6].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[6], false), false);
+ assertTrue(added);
+
+ //durable
+
+ queues[7] = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[7].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[7], false), false);
+ assertTrue(added);
+
+ //condition2
+
+
+ queues[8] = new MessagingQueue(1, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[8].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[8], false), false);
+ assertTrue(added);
+
+ queues[9] = new MessagingQueue(1, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[9].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[9], false), false);
+ assertTrue(added);
+
+ queues[10] = new MessagingQueue(2, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[10].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[10], false), false);
+ assertTrue(added);
+
+ queues[11] = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[11].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[11], false), false);
+ assertTrue(added);
+
+ //durable
+
+ queues[12] = new MessagingQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[12].activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[12], false), false);
+ assertTrue(added);
+
+ queues[13] = new MessagingQueue(1, "sub14", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queues[13].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[13], false), false);
+ assertTrue(added);
+
+ //durable
+
+ queues[14] = new MessagingQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[14].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[14], false), false);
+ assertTrue(added);
+
+ queues[15] = new MessagingQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queues[15].activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[15], false), false);
+ assertTrue(added);
+
+ SimpleReceiver[] receivers = new SimpleReceiver[16];
+
+ for (int i = 0; i < 16; i++)
+ {
+ receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queues[i].getLocalDistributor().add(receivers[i]);
+ }
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office1.route(ref, new SimpleCondition("condition1"), null);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ //Durable queues on remote node should never get the message
+
+ for (int i = 0; i < 16; i++)
+ {
+ if (i >= 8 || (queues[i].getNodeID() == 2 && queues[i].isRecoverable()))
+ {
+ this.checkNotGetsMessage(queues[i], receivers[i]);
+ }
+ else
+ {
+ //Should get the message
+ this.checkGetsMessage(queues[i], receivers[i], msg);
+ }
+
+ }
+
+ //Now route to condition2
+
+ msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);;
+ ref = ms.reference(msg);
+
+ routed = office2.route(ref, new SimpleCondition("condition2"), null);
+ assertTrue(routed);
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ for (int i = 0; i < 16; i++)
+ {
+ if (i < 8 || (queues[i].getNodeID() == 1 && queues[i].isRecoverable()))
+ {
+ //Shouldn't get the message
+ this.checkNotGetsMessage(queues[i], receivers[i]);
+ }
+ else
+ {
+ //Should get the message
+ this.checkGetsMessage(queues[i], receivers[i], msg);
+ }
+
+ }
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.removeBinding("sub7", false);
+ office1.removeBinding("sub8", false);
+ office1.removeBinding("sub15", false);
+ office1.removeBinding("sub16", false);
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.removeBinding("sub5", false);
+ office2.removeBinding("sub13", false);
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+ office2.stop();
+ }
+
+ }
+ }
+
+
+
+ /*
+ * Queues with same name on different nodes of the cluster.
+ * If queue is routed to locally it shouldn't be routed to on other nodes
+ *
+ */
+ private void routeSharedQueue(boolean persistentMessage) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+ //queue1
+
+ Queue queue0 = new MessagingQueue(1, "myqueue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue0.activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("myqueue1"), queue0, false), false);
+ assertTrue(added);
+
+ Queue queue1 = new MessagingQueue(2, "myqueue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue1.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("myqueue1"), queue1, false), false);
+ assertTrue(added);
+
+
+ //queue2
+
+ Queue queue2 = new MessagingQueue(1, "myqueue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue2.activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("myqueue2"), queue2, false), false);
+ assertTrue(added);
+
+ Queue queue3 = new MessagingQueue(2, "myqueue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue3.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("myqueue2"), queue3, false), false);
+ assertTrue(added);
+
+
+ SimpleReceiver receiver0 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue0.getLocalDistributor().add(receiver0);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ //Route to myqueue1 from office1
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office1.route(ref, new SimpleCondition("myqueue1"), null);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ //Only queue0 should get the message
+ checkGetsMessage(queue0, receiver0, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+ // Route to myqueue1 from office 2
+
+ msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);
+ ref = ms.reference(msg);
+
+ routed = office2.route(ref, new SimpleCondition("myqueue1"), null);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ //Only queue1 should get the message
+ checkGetsMessage(queue1, receiver1, msg);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+
+ //Now route to condition2 from office 1
+
+ msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);;
+ ref = ms.reference(msg);
+
+ routed = office1.route(ref, new SimpleCondition("myqueue2"), null);
+ assertTrue(routed);
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ // Only queue2 should get the message
+ checkGetsMessage(queue2, receiver2, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+
+ //Now route to condition2 from office 2
+
+ msg = CoreMessageFactory.createCoreMessage(4, persistentMessage, null);;
+ ref = ms.reference(msg);
+
+ routed = office2.route(ref, new SimpleCondition("myqueue2"), null);
+ assertTrue(routed);
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ // Only queue3 should get the message
+ checkGetsMessage(queue3, receiver3, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+
+ office2.stop();
+ }
+
+ }
+ }
+
+
+ private void routeWithFilter(boolean persistentMessage) throws Throwable
+ {
+ PostOffice office1 = null;
+
+ PostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+
+
+ Queue queue0 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue0.activate();
+ boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue0, false), false);
+ assertTrue(added);
+
+ Queue queue1 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue1.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+ assertTrue(added);
+
+
+ Queue queue2 = new MessagingQueue(1, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue2.activate();
+ added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queue2, false), false);
+ assertTrue(added);
+
+ Queue queue3 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue3.activate();
+ added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queue3, false), false);
+ assertTrue(added);
+
+
+ SimpleReceiver receiver0 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue0.getLocalDistributor().add(receiver0);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue1.getLocalDistributor().add(receiver1);
+
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue2.getLocalDistributor().add(receiver2);
+
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+ queue3.getLocalDistributor().add(receiver3);
+
+ //Route to condition1 from office1
+
+ Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office1.route(ref, new SimpleCondition("myqueue1"), null);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ //Only queue0 should get the message
+ checkGetsMessage(queue0, receiver0, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+ // Route to myqueue1 from office 2
+
+ msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);
+ ref = ms.reference(msg);
+
+ routed = office2.route(ref, new SimpleCondition("myqueue1"), null);
+ assertTrue(routed);
+
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ //Only queue1 should get the message
+ checkGetsMessage(queue1, receiver1, msg);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+
+ //Now route to condition2 from office 1
+
+ msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);;
+ ref = ms.reference(msg);
+
+ routed = office1.route(ref, new SimpleCondition("myqueue2"), null);
+ assertTrue(routed);
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ // Only queue2 should get the message
+ checkGetsMessage(queue2, receiver2, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue3, receiver3);
+
+
+ //Now route to condition2 from office 2
+
+ msg = CoreMessageFactory.createCoreMessage(4, persistentMessage, null);;
+ ref = ms.reference(msg);
+
+ routed = office2.route(ref, new SimpleCondition("myqueue2"), null);
+ assertTrue(routed);
+ //Messages are sent asych so may take some finite time to arrive
+ Thread.sleep(1000);
+
+ // Only queue3 should get the message
+ checkGetsMessage(queue3, receiver3, msg);
+
+ checkNotGetsMessage(queue1, receiver1);
+
+ checkNotGetsMessage(queue0, receiver0);
+
+ checkNotGetsMessage(queue2, receiver2);
+
+
+
+ if (checkNoMessageData())
+ {
+ fail("Message data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+
+ office2.stop();
+ }
+
+ }
+ }
+
+ private void dumpNodeIDView(PostOffice postOffice)
+ {
+ Set view = postOffice.nodeIDView();
+
+ log.info("=== node id view ==");
+
+ Iterator iter = view.iterator();
+
+ while (iter.hasNext())
+ {
+ log.info("Node:" + iter.next());
+ }
+
+ log.info("==================");
+ }
+
+ private void assertGotAll(int nodeId, Collection bindings, String queueName)
+ {
+
+ log.info("============= dumping bindings ========");
+
+ Iterator iter = bindings.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ log.info("Binding: " + binding);
+ }
+
+ log.info("========= end dump==========");
+
+ assertEquals(3, bindings.size());
+
+ iter = bindings.iterator();
+
+ boolean got1 = false;
+ boolean got2 = false;
+ boolean got3 = false;
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ log.info("binding node id " + binding.queue.getNodeID());
+
+ assertEquals(queueName, binding.queue.getName());
+ if (binding.queue.getNodeID() == nodeId)
+ {
+ assertTrue(binding.allNodes);
+ }
+ else
+ {
+ assertFalse(binding.allNodes);
+ }
+
+ if (binding.queue.getNodeID() == 1)
+ {
+ got1 = true;
+ }
+ if (binding.queue.getNodeID() == 2)
+ {
+ got2 = true;
+ }
+ if (binding.queue.getNodeID() == 3)
+ {
+ got3 = true;
+ }
+ }
+ assertTrue(got1 && got2 && got3);
+ }
+
+
+ private void checkGetsMessage(Queue queue, SimpleReceiver receiver, Message msg) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ msgs = queue.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ private void checkNotGetsMessage(Queue queue, SimpleReceiver receiver) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queue.browse(null);
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list