[jboss-cvs] JBoss Messaging SVN: r2829 - in trunk: src/main/org/jboss/jms/client/state and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 3 13:08:00 EDT 2007


Author: timfox
Date: 2007-07-03 13:08:00 -0400 (Tue, 03 Jul 2007)
New Revision: 2829

Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
Log:
Completed clustered post office test


Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -47,6 +47,7 @@
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
+ * @author <a href="mailto:sergey.koshcheyev at jboss.com">Sergey Koscheyev/a>
  * @version <tt>$Revision: 2774 $</tt>
  *
  * $Id: MessageCallbackHandler.java 2774 2007-06-12 22:43:54Z timfox $

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -344,7 +344,6 @@
          catch (Exception e)
          {
             log.error(e.toString(),e);
-            log.info("RecoverDeliveries failed, marking session as invalidated!");
             this.getDelegate().invalidate();
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -911,8 +911,8 @@
                }
                else
                {	
-	               if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
-	
+	               if (trace) { log.trace(this + ": adding " + ref + " to memory"); }
+	               
 	               try
 	               {
 	                  synchronized (lock)

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessageRequest.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -23,11 +23,12 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.impl.message.MessageFactory;
-import org.jboss.messaging.util.StreamUtils;
 
 /**
  * A MessageRequest
@@ -42,24 +43,32 @@
  */
 class MessageRequest extends ClusterRequest
 {
+   private static final byte NULL = 0;
+   
+   private static final byte NOT_NULL = 1;
+	
    private String routingConditionText;   
    
    private Message message;
    
+   private Set queueNames;
+   
    MessageRequest()
    {      
    }
    
-   MessageRequest(String routingConditionText, Message message)
+   MessageRequest(String routingConditionText, Message message, Set queueNames)
    {
       this.routingConditionText = routingConditionText;
       
       this.message = message;
+      
+      this.queueNames = queueNames;
    }
    
    Object execute(RequestTarget office) throws Exception
    {
-      office.routeFromCluster(message, routingConditionText);      
+      office.routeFromCluster(message, routingConditionText, queueNames);      
       
       return null;
    }  
@@ -77,9 +86,25 @@
       
       message = MessageFactory.createMessage(type);
       
-      message.read(in);        
+      message.read(in);  
+      
+      byte b = in.readByte();
+      
+      if (b != NULL)
+      {
+      	int size = in.readInt();
+      	
+      	queueNames = new HashSet(size);
+      	
+      	for (int i = 0; i < size; i++)
+      	{
+      		String queueName = in.readUTF();
+      		
+      		queueNames.add(queueName);
+      	}
+      }
    }
-
+   
    public void write(DataOutputStream out) throws Exception
    {
       out.writeUTF(routingConditionText);
@@ -87,5 +112,25 @@
       out.writeByte(message.getType());      
       
       message.write(out);
+      
+      if (queueNames == null)
+      {
+      	out.writeByte(NULL);
+      }
+      else
+      {
+      	out.writeByte(NOT_NULL);
+      	
+      	out.writeInt(queueNames.size());
+      	
+      	Iterator iter = queueNames.iterator();
+      	
+      	while (iter.hasNext())
+      	{
+      		String queueName = (String)iter.next();
+      		
+      		out.writeUTF(queueName);
+      	}      	     
+      }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -47,7 +47,6 @@
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 
-import org.jboss.jms.server.JMSCondition;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.ClusterNotification;
@@ -71,6 +70,7 @@
 import org.jboss.messaging.core.impl.MessagingQueue;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
+import org.jboss.messaging.core.impl.tx.TxCallback;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.StreamUtils;
 import org.jgroups.Address;
@@ -436,7 +436,7 @@
          throw new IllegalArgumentException("Condition is null");
       }
       
-      return routeInternal(ref, condition, tx, false);
+      return routeInternal(ref, condition, tx, false, null);
    }
 
    public Collection getQueuesForCondition(Condition condition, boolean localOnly) throws Exception
@@ -967,9 +967,9 @@
       return true;
    }
 
-   public void routeFromCluster(Message message, String routingKeyText) throws Exception
+   public void routeFromCluster(Message message, String routingKeyText, Set queueNames) throws Exception
    {
-      if (trace) { log.trace(this + " routing from cluster " + message + ", routing key " + routingKeyText); }
+      if (trace) { log.trace(this + " routing from cluster " + message + ", routing key " + routingKeyText + ", queue names " + queueNames); }
 
       Condition routingKey = conditionFactory.createCondition(routingKeyText);
 
@@ -979,7 +979,7 @@
       {
          ref = ms.reference(message);
          
-         routeInternal(ref, routingKey, null, true);        
+         routeInternal(ref, routingKey, null, true, queueNames);        
       }
       finally
       {
@@ -1463,7 +1463,7 @@
    	}
    }
    
-   private boolean routeInternal(MessageReference ref, Condition condition, Transaction tx, boolean fromCluster) throws Exception
+   private boolean routeInternal(MessageReference ref, Condition condition, Transaction tx, boolean fromCluster, Set names) throws Exception
    {
    	if (trace) { log.trace(this + " routing " + ref + " with condition '" +
    			                 condition + "'" + (tx == null ? "" : " transactionally in " + tx) + 
@@ -1503,8 +1503,30 @@
          			//When routing to a clustered temp queue, the queue is unreliable - but we always want to route to the local
          			//one so we need to add the check that we only route remotely if it's a topic
          			//We could do this better by making sure that only one queue with the same name is routed to on the cluster
-         			if (!fromCluster || (!queue.isRecoverable() && !((JMSCondition)condition).isQueue()))
+         			
+         			boolean routeLocal = false;
+         			
+         			if (!fromCluster)
          			{
+         				//Same node
+         				routeLocal = true;
+         			}
+         			else
+         			{
+         				//From the cluster
+         				if (!queue.isRecoverable())
+         				{
+         					//When routing from the cluster we only route to non recoverable queues
+         					//who haven't already been routed to on the sending node (same name)
+         					if (names == null || !names.contains(queue.getName()))
+         					{
+         						routeLocal = true;
+         					}
+         				}
+         			}
+         			
+         			if (routeLocal)
+         			{
          				//If we're not routing from the cluster OR the queue is unreliable then we consider it
          				
          				//When we route from the cluster we never route to reliable queues
@@ -1554,33 +1576,7 @@
          			}
          		}
          	}
-         	
-         	if (remoteSet != null)
-         	{
-         		//There are queues on other nodes that want the message too
-         		
-         		//If the message is non reliable then we can unicast or multicast the message to the group so it
-         		//can get picked up by other nodes
-         		
-         		ClusterRequest request = new MessageRequest(condition.toText(), ref.getMessage());
-         		
-         		if (trace) { log.trace(this + " casting message to other node(s)"); }
-         		
-         		if (remoteSet.size() == 1)
-         		{
-         			//Only one node requires the message, so we can unicast
-         			
-         			unicastRequest(request, ((Integer)remoteSet.iterator().next()).intValue());
-         		}
-         		else
-         		{
-         			//More than one node requires the message - so we multicast         			
-         			//Whether it is reliable or unreliable multicast is determined by the JGroups stack config
-         			
-         			multicastRequest(request);
-         		}
-         	}   
-         	
+         	         	         	
          	//If the ref is reliable and there is more than one reliable local queue that accepts the message then we need
          	//to route in a transaction to guarantee once and only once reliability guarantee
          	
@@ -1599,6 +1595,8 @@
          	
          	iter = targets.iterator();
          	
+         	Set queueNames = null;
+         	
          	while (iter.hasNext())
          	{
          		Queue queue = (Queue)iter.next();
@@ -1612,9 +1610,56 @@
                if (del != null && del.isSelectorAccepted())
                {
                   routed = true;
+                  
+                  if (remoteSet != null)
+                  {
+                  	if (queueNames == null)
+                  	{
+                  		queueNames = new HashSet();
+                  	}
+                  	
+                  	//We put the queue name in a set - this is used on other nodes after routing from the cluster so it
+                  	//doesn't route to queues with the same name on other nodes
+                  	queueNames.add(queue.getName());
+                  }
                }
          	}
-         	
+         	            
+            if (remoteSet != null)
+         	{
+         		//There are queues on other nodes that want the message too
+         		
+         		//If the message is non reliable then we can unicast or multicast the message to the group so it
+         		//can get picked up by other nodes         		
+         		
+         		ClusterRequest request = new MessageRequest(condition.toText(), ref.getMessage(), queueNames);
+         		
+         		if (trace) { log.trace(this + " casting message to other node(s)"); }
+         		
+         		Integer nodeID = null;
+         		
+         		if (remoteSet.size() == 1)
+         		{
+         			//Only one node requires the message, so we can unicast
+         			
+         			nodeID = (Integer)remoteSet.iterator().next();
+         		}
+         		
+         		TxCallback callback = new CastMessageCallback(nodeID, request);
+         		
+         		if (tx != null)
+         		{
+         			tx.addCallback(callback, this);
+         		}
+         		else
+         		{
+         			//Execute it now
+         			callback.afterCommit(true);
+         		}
+         		
+         		routed = true;
+         	}          
+            
             if (startedTx)
             {
                if (trace) { log.trace(this + " committing " + tx); }
@@ -1632,7 +1677,7 @@
 
       return routed;
    }   
-   
+         
    private Binding removeBindingInMemory(int nodeID, String queueName) throws Exception
    {
    	lock.writeLock().acquire();
@@ -2333,5 +2378,57 @@
    }
 
    // Inner classes --------------------------------------------------------------------------------
+   
+   private class CastMessageCallback implements TxCallback
+   {
+   	private Integer nodeID;
+   	
+   	private ClusterRequest request;
+   	
+   	CastMessageCallback(Integer nodeID, ClusterRequest request)
+   	{
+   		this.nodeID = nodeID;
+   		
+   		this.request = request;
+   	}
 
+		public void afterCommit(boolean onePhase) throws Exception
+		{
+			if (nodeID == null)
+			{
+				multicastRequest(request);
+			}
+			else
+			{
+				unicastRequest(request, nodeID.intValue());
+			}
+		}
+
+		public void afterPrepare() throws Exception
+		{	
+			//NOOP
+		}
+
+		public void afterRollback(boolean onePhase) throws Exception
+		{
+			//NOOP
+		}
+
+		public void beforeCommit(boolean onePhase) throws Exception
+		{
+			//NOOP
+		}
+
+		public void beforePrepare() throws Exception
+		{
+			//NOOP
+		}
+
+		public void beforeRollback(boolean onePhase) throws Exception
+		{
+			//NOOP
+		}
+   	
+   }
+
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.impl.postoffice;
 
 import java.io.Serializable;
+import java.util.Set;
 
 import org.jboss.messaging.core.contract.Message;
 
@@ -49,5 +50,5 @@
    
    boolean removeReplicantLocally(int nodeId, Serializable key) throws Exception;
    
-   void routeFromCluster(Message message, String routingKeyText) throws Exception;
+   void routeFromCluster(Message message, String routingKeyText, Set queueNames) throws Exception;
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java	2007-07-03 12:12:42 UTC (rev 2828)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java	2007-07-03 17:08:00 UTC (rev 2829)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.test.messaging.core.PostOfficeTestBase;
 import org.jboss.test.messaging.core.SimpleCondition;
 import org.jboss.test.messaging.core.SimpleFilter;
@@ -81,7 +82,7 @@
                   
          office3 = createClusteredPostOffice(3, "testgroup");
          
-         Thread.sleep(2000);
+         Thread.sleep(1000);
          
          Set nodes = office1.nodeIDView();         
          assertTrue(nodes.contains(new Integer(1)));
@@ -141,7 +142,7 @@
                   
          office3 = createClusteredPostOffice(3, "testgroup");
          
-         Thread.sleep(2000);
+         Thread.sleep(1000);
          
          Map failoverMap1 = office1.getFailoverMap();
          
@@ -199,7 +200,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          boolean added = office1.addBinding(new Binding(condition1, queue1, false), false);
          assertTrue(added);
@@ -502,7 +503,7 @@
          added = office2.addBinding(new Binding(condition1, queue9, false), false);
          assertTrue(added);
          
-         Condition condition2 = new SimpleCondition("topic2");
+         Condition condition2 = new SimpleCondition("condition2");
          
          added = office2.addBinding(new Binding(condition2, queue10, false), false);
          assertTrue(added);
@@ -583,61 +584,6 @@
       }
    }
    
-   /*
-    * Bind / Unbind all tests
-    * 
-    * 1.
-    * a) queue is not known by cluster
-    * b) bind all
-    * c) verify all nodes get queue
-    * d) unbind - verify unbound from all nodes
-    * e) close down all nodes
-    * f) start all nodes
-    * g) verify queue is not known
-    * 
-    * 2.
-    * a) queue is known by cluster
-    * b) bind all
-    * c) verify nothing changes on cluster
-    * 
-    * 3
-    * a) start one node
-    * b) queue is not known to cluster
-    * c) bind all
-    * d) start other nodes
-    * d) verify other nodes pick it up
-    * 
-    * 4
-    * a) start one node
-    * b) queue is not known to cluster
-    * c) bind all
-    * d) shutdown all nodes
-    * e) startup all nodes
-    * f) verify queue is on all nodes
-    * 
-    * 5
-    * a) start one node
-    * b) queue is not known
-    * c) bind all
-    * d) shutdown node
-    * e) start other nodes
-    * f) verify queue is not known
-    * g) restart first node, verify queue is now known
-    * 
-    * 6
-    * 
-    * non durable bind all
-    * a) bind all non durable
-    * b) make sure is picked up by all nodes
-    * c) close down all nodes
-    * d) restart them all - make sure is not there
-    * e) bind again
-    * f) make sure is picked up
-    * g) take down one node
-    * h) bring it back up
-    * i) make sure it has quuee again
-    */
-   
    public void testBindUnbindAll1() throws Throwable
    {
    	/*
@@ -665,7 +611,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -827,7 +773,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -941,7 +887,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1043,7 +989,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1152,7 +1098,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1271,7 +1217,7 @@
          Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
          queue1.activate();
 
-         Condition condition1 = new SimpleCondition("topic1");
+         Condition condition1 = new SimpleCondition("condition1");
          
          //Add all binding
          boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
@@ -1384,171 +1330,196 @@
       }
    }
    
-   private void dumpNodeIDView(PostOffice postOffice)
+  
+   public final void testClusteredRoutePersistent() throws Throwable
    {
-   	Set view = postOffice.nodeIDView();
-   	
-   	log.info("=== node id view ==");
-   	
-   	Iterator iter = view.iterator();
-   	
-   	while (iter.hasNext())
-   	{
-   		log.info("Node:" + iter.next());
-   	}
-   	
-   	log.info("==================");
+      clusteredRoute(true);
    }
    
-   private void assertGotAll(int nodeId, Collection bindings, String queueName)
+   public final void testClusteredRouteNonPersistent() throws Throwable
    {
-   	
-   	log.info("============= dumping bindings ========");
-   	
-   	Iterator iter = bindings.iterator();
-   	
-   	while (iter.hasNext())
-   	{
-   		Binding binding = (Binding)iter.next();
-   		
-   		log.info("Binding: " + binding);
-   	}
-   	
-   	log.info("========= end dump==========");
-   	
-      assertEquals(3, bindings.size());
-      
-      iter = bindings.iterator();
-      
-      boolean got1 = false;
-      boolean got2 = false;
-      boolean got3 = false;         
-      while (iter.hasNext())
-      {
-      	Binding binding = (Binding)iter.next();
-      	
-      	log.info("binding node id " + binding.queue.getNodeID());
-      	
-      	assertEquals(queueName, binding.queue.getName());
-      	if (binding.queue.getNodeID() == nodeId)
-      	{
-      		assertTrue(binding.allNodes);
-      	}
-      	else
-      	{
-      		assertFalse(binding.allNodes);
-      	}
-      	
-      	if (binding.queue.getNodeID() == 1)
-      	{
-      		got1 = true;
-      	}
-      	if (binding.queue.getNodeID() == 2)
-      	{
-      		got2 = true;
-      	}
-      	if (binding.queue.getNodeID() == 3)
-      	{
-      		got3 = true;
-      	}    	
-      }         
-      assertTrue(got1 && got2 && got3);
+      clusteredRoute(false);
    }
+
+   public void testClusteredRouteWithFilterNonPersistent() throws Throwable
+   {
+      this.clusteredRouteWithFilter(false);
+   }
    
+   public void testClusteredRouteWithFilterPersistent() throws Throwable
+   {
+      this.clusteredRouteWithFilter(true);
+   }
    
-
-//   
-//   public final void testClusteredRoutePersistent() throws Throwable
-//   {
-//      clusteredRoute(true);
-//   }
-//   
-//   public final void testClusteredRouteNonPersistent() throws Throwable
-//   {
-//      clusteredRoute(false);
-//   }
-//   
-//   public final void testClusteredTransactionalRoutePersistent() throws Throwable
-//   {
-//      clusteredTransactionalRoute(true);
-//   }
-//   
-//   public final void testClusteredTransactionalRouteNonPersistent() throws Throwable
-//   {
-//      clusteredTransactionalRoute(false);
-//   }
-//   
-//   public void testClusteredNonPersistentRouteWithFilterNonRecoverable() throws Throwable
-//   {
-//      this.clusteredRouteWithFilter(false, false);
-//   }
-//   
-//   public void testClusteredPersistentRouteWithFilterNonRecoverable() throws Throwable
-//   {
-//      this.clusteredRouteWithFilter(true);
-//   }
-//   
-//   public void testClusteredNonPersistentRouteWithFilterRecoverable() throws Throwable
-//   {
-//      this.clusteredRouteWithFilter(false, true);
-//   }
-//   
-//   public void testClusteredPersistentRouteWithFilterRecoverable() throws Throwable
-//   {
-//      this.clusteredRouteWithFilter(true, true);
-//   }
-//      
-//   public void testRouteSharedPointToPointQueuePersistentNonRecoverable() throws Throwable
-//   {
-//      this.routeSharedQueue(true);
-//   }
-//   
-//   public void testRouteSharedPointToPointQueueNonPersistentNonRecoverable() throws Throwable
-//   {
-//      this.routeSharedQueue(false, false);
-//   }
-//   
-//   public void testRouteSharedPointToPointQueuePersistentRecoverable() throws Throwable
-//   {
-//      this.routeSharedQueue(true, true);
-//   }
-//   
-//   public void testRouteSharedPointToPointQueueNonPersistentRecoverable() throws Throwable
-//   {
-//      this.routeSharedQueue(false, true);
-//   }
-//   
-//   public void testRouteComplexTopicPersistent() throws Throwable
-//   {
-//      this.routeComplexTopic(true);
-//   }
-//   
-//   public void testRouteComplexTopicNonPersistent() throws Throwable
-//   {
-//      this.routeComplexTopic(false);
-//   }
-//         
-//   public void testRouteLocalQueuesPersistentNonRecoverable() throws Throwable
-//   {
-//      this.routeLocalQueues(true);
-//   }
-//   
-//   public void testRouteLocalQueuesNonPersistentNonRecoverable() throws Throwable
-//   {
-//      this.routeLocalQueues(false, false);
-//   }
-//   
-//   public void testRouteLocalQueuesPersistentRecoverable() throws Throwable
-//   {
-//      this.routeLocalQueues(true, true);
-//   }
-//   
-//   public void testRouteLocalQueuesNonPersistentRecoverable() throws Throwable
-//   {
-//      this.routeLocalQueues(false, true);
-//   }
+   public void testRouteSharedQueuePersistent() throws Throwable
+   {
+      this.routeSharedQueue(true);
+   }
    
+   public void testRouteSharedQueueNonPersistent() throws Throwable
+   {
+      this.routeSharedQueue(false);
+   }
    
+   public void testClusteredTransactionalRoutePersistent() throws Throwable
+   {
+   	this.clusteredTransactionalRoute(true);
+   }
+   
+   public void testClusteredTransactionalRouteNonPersistent() throws Throwable
+   {
+   	this.clusteredTransactionalRoute(false);
+   }
+   
+   public void testClusteredRouteFourNodesPersistent() throws Throwable
+   {
+   	this.clusteredRouteFourNodes(true);
+   }
+   
+   public void testClusteredRouteFourNodesNonPersistent() throws Throwable
+   {
+   	this.clusteredRouteFourNodes(false);
+   }
+   
+   public void testStartTxInternally() throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+
+   		Queue queue1 =  new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queue1.activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue2 =  new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queue2.activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue3 =  new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queue3.activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue4 =  new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queue4.activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue4, false), false);
+   		assertTrue(added);
+
+   		SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue1.getLocalDistributor().add(receiver1);
+
+   		SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue2.getLocalDistributor().add(receiver2);
+
+   		SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue3.getLocalDistributor().add(receiver3);
+   		
+   		SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue4.getLocalDistributor().add(receiver4);
+
+   		Message msg1 = CoreMessageFactory.createCoreMessage(1, true, null);      
+   		MessageReference ref1 = ms.reference(msg1);  
+   		boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);   
+   		assertTrue(routed);
+
+   		Message msg2 = CoreMessageFactory.createCoreMessage(2, true, null);      
+   		MessageReference ref2 = ms.reference(msg2);         
+   		routed = office1.route(ref2, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Message msg3 = CoreMessageFactory.createCoreMessage(3, true, null);      
+   		MessageReference ref3 = ms.reference(msg3);         
+   		routed = office1.route(ref3, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Thread.sleep(1000);
+   		
+   		List msgs = receiver1.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		Message msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		Message msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		Message msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver1.acknowledge(msgRec1, null);
+   		receiver1.acknowledge(msgRec2, null);
+   		receiver1.acknowledge(msgRec3, null);
+   		msgs = queue1.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		msgs = receiver2.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver2.acknowledge(msgRec1, null);
+   		receiver2.acknowledge(msgRec2, null);
+   		receiver2.acknowledge(msgRec3, null);
+   		msgs = queue2.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+
+   		msgs = receiver3.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver3.acknowledge(msgRec1, null);
+   		receiver3.acknowledge(msgRec2, null);
+   		receiver3.acknowledge(msgRec3, null);
+   		msgs = queue3.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		msgs = receiver4.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver4.acknowledge(msgRec1, null);
+   		receiver4.acknowledge(msgRec2, null);
+   		receiver4.acknowledge(msgRec3, null);
+   		msgs = queue4.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			office1.stop();
+   		}
+   	}
+   }
+ 
    public void testBindSameName() throws Throwable
    {
       PostOffice office1 = null;
@@ -1614,1687 +1585,6 @@
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
-//   
-//   protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable)
-//      throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      
-//      PostOffice office2 = null;
-//          
-//      try
-//      {   
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//         
-//         SimpleFilter filter1 = new SimpleFilter(2);
-//         SimpleFilter filter2 = new SimpleFilter(3);
-//      
-//         LocalClusteredQueue queue1 =
-//            new LocalClusteredQueue(1, "queue1", channelIDManager.getID(), ms, pm,
-//                                    true, recoverable, -1, filter1);
-//
-//         office1.bindQueue(new SimpleCondition("topic1"), queue1);
-//         
-//         LocalClusteredQueue queue2 =
-//            new LocalClusteredQueue(2, "queue2", channelIDManager.getID(), ms, pm,
-//                                    true, recoverable, -1, filter2);
-//
-//         office2.bindQueue(new SimpleCondition("topic1"), queue2);
-//         
-//         LocalClusteredQueue queue3 =
-//            new LocalClusteredQueue(2, "queue3", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office2.bindQueue(new SimpleCondition("topic1"), queue3);
-//         
-//         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-//         queue1.add(receiver1);
-//
-//         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-//         queue2.add(receiver2);
-//
-//         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//
-//         queue3.add(receiver3);
-//         
-//         Message msg1 = CoreMessageFactory.createCoreMessage(1);      
-//         MessageReference ref1 = ms.reference(msg1);  
-//         boolean routed = office1.route(ref1, new SimpleCondition("topic1"), null);   
-//         assertTrue(routed);
-//         
-//         
-//         Message msg2 = CoreMessageFactory.createCoreMessage(2);      
-//         MessageReference ref2 = ms.reference(msg2);         
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), null);      
-//         assertTrue(routed);
-//         
-//         Message msg3 = CoreMessageFactory.createCoreMessage(3);      
-//         MessageReference ref3 = ms.reference(msg3);         
-//         routed = office1.route(ref3, new SimpleCondition("topic1"), null);      
-//         assertTrue(routed);
-//         
-//         Thread.sleep(2000);
-//         
-//         List msgs = receiver1.getMessages();
-//         assertNotNull(msgs);
-//         assertEquals(1, msgs.size());
-//         Message msgRec = (Message)msgs.get(0);
-//         assertTrue(msg2 == msgRec);
-//         receiver1.acknowledge(msgRec, null);
-//         msgs = queue1.browse();
-//         assertNotNull(msgs);
-//         assertTrue(msgs.isEmpty());  
-//         
-//         msgs = receiver2.getMessages();
-//         assertNotNull(msgs);
-//         assertEquals(1, msgs.size());
-//         msgRec = (Message)msgs.get(0);
-//         assertTrue(msg3 == msgRec);
-//         receiver2.acknowledge(msgRec, null);
-//         msgs = queue2.browse();
-//         assertNotNull(msgs);
-//         assertTrue(msgs.isEmpty());  
-//         
-//         msgs = receiver3.getMessages();
-//         assertNotNull(msgs);
-//         assertEquals(3, msgs.size());
-//         Message msgRec1 = (Message)msgs.get(0);
-//         assertTrue(msg1 == msgRec1);
-//         Message msgRec2 = (Message)msgs.get(1);
-//         assertTrue(msg2 == msgRec2);
-//         Message msgRec3 = (Message)msgs.get(2);
-//         assertTrue(msg3 == msgRec3);
-//          
-//         receiver3.acknowledge(msgRec1, null);
-//         receiver3.acknowledge(msgRec2, null);
-//         receiver3.acknowledge(msgRec3, null);
-//         msgs = queue3.browse();
-//         assertNotNull(msgs);
-//         assertTrue(msgs.isEmpty()); 
-//                  
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            office2.stop();
-//         }
-//         
-//      }
-//   }
-//   
-//   protected void clusteredRoute(boolean persistentMessage) throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      
-//      PostOffice office2 = null;
-//          
-//      try
-//      {   
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//      
-//         //Two topics with a mixture of durable and non durable subscriptions
-//         
-//         LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
-//         Binding[] bindings = new Binding[16];
-//         
-//         queues[0] = new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[0] = office1.bindQueue(new SimpleCondition("topic1"), queues[0]);
-//         
-//         queues[1] = new LocalClusteredQueue(1, "sub2", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[1] = office1.bindQueue(new SimpleCondition("topic1"), queues[1]);
-//         
-//         queues[2] = new LocalClusteredQueue(2, "sub3", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[2] = office2.bindQueue(new SimpleCondition("topic1"), queues[2]);
-//         
-//         queues[3] = new LocalClusteredQueue(2, "sub4", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[3] = office2.bindQueue(new SimpleCondition("topic1"), queues[3]);
-//         
-//         queues[4] = new LocalClusteredQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[4] = office2.bindQueue(new SimpleCondition("topic1"), queues[4]);
-//         
-//         queues[5] = new LocalClusteredQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[5] = office1.bindQueue(new SimpleCondition("topic1"), queues[5]);
-//         
-//         queues[6] = new LocalClusteredQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[6] = office1.bindQueue(new SimpleCondition("topic1"), queues[6]);
-//         
-//         queues[7] = new LocalClusteredQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[7] = office1.bindQueue(new SimpleCondition("topic1"), queues[7]);
-//         
-//         queues[8] = new LocalClusteredQueue(1, "sub9", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[8] = office1.bindQueue(new SimpleCondition("topic2"), queues[8]);
-//         
-//         queues[9] = new LocalClusteredQueue(1, "sub10", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[9] = office1.bindQueue(new SimpleCondition("topic2"), queues[9]);
-//         
-//         queues[10] = new LocalClusteredQueue(2, "sub11", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[10] = office2.bindQueue(new SimpleCondition("topic2"), queues[10]);
-//         
-//         queues[11] = new LocalClusteredQueue(2, "sub12", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[11] = office2.bindQueue(new SimpleCondition("topic2"), queues[11]);
-//         
-//         queues[12] = new LocalClusteredQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[12] = office2.bindQueue(new SimpleCondition("topic2"), queues[12]);
-//         
-//         queues[13] = new LocalClusteredQueue(1, "sub14", channelIDManager.getID(), ms, pm, true, false, -1, null);
-//         bindings[13] = office1.bindQueue(new SimpleCondition("topic2"), queues[13]);
-//         
-//         queues[14] = new LocalClusteredQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[14] = office1.bindQueue(new SimpleCondition("topic2"), queues[14]);
-//         
-//         queues[15] = new LocalClusteredQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, true, -1, null);
-//         bindings[15] = office1.bindQueue(new SimpleCondition("topic2"), queues[15]);
-//       
-//         SimpleReceiver[] receivers = new SimpleReceiver[16];
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//            queues[i].add(receivers[i]);
-//         }
-//         
-//         Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         MessageReference ref = ms.reference(msg);         
-//
-//         boolean routed = office1.route(ref, new SimpleCondition("topic1"), null);         
-//         assertTrue(routed);
-//         
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);
-//         
-//         for (int i = 0; i < 8; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(1, msgs.size());
-//            Message msgRec = (Message)msgs.get(0);
-//            assertEquals(msg.getMessageID(), msgRec.getMessageID());
-//            receivers[i].acknowledge(msgRec, null);
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty()); 
-//            receivers[i].clear();
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//                  
-//         //Now route to topic2
-//         
-//         msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);;      
-//         ref = ms.reference(msg);         
-//
-//         routed = office1.route(ref, new SimpleCondition("topic2"), null);         
-//         assertTrue(routed);
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);
-//         
-//         for (int i = 0; i < 8; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(1, msgs.size());
-//            Message msgRec = (Message)msgs.get(0);
-//            assertEquals(msg.getMessageID(), msgRec.getMessageID());
-//            receivers[i].acknowledge(msgRec, null);
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty()); 
-//            receivers[i].clear();
-//         }
-//         
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }
-//
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {
-//            try
-//            {              
-//               office1.unbindQueue("sub7");
-//               office1.unbindQueue("sub8");               
-//               office1.unbindQueue("sub15");
-//               office1.unbindQueue("sub16");
-//            }
-//            catch (Exception ignore)
-//            {
-//               ignore.printStackTrace();
-//            }
-//            
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            try
-//            {
-//               office2.unbindQueue("sub5");
-//               office2.unbindQueue("sub13");
-//            }
-//            catch (Exception ignore)
-//            {     
-//               ignore.printStackTrace();
-//            }
-//            office2.stop();
-//         }
-//        
-//      }
-//   }
-//   
-//   protected void routeSharedQueue(boolean persistentMessage, boolean recoverable) throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      
-//      PostOffice office2 = null;
-//      
-//      PostOffice office3 = null;
-//      
-//      PostOffice office4 = null;
-//      
-//      PostOffice office5 = null;
-//      
-//      PostOffice office6 = null;
-//        
-//      try
-//      {   
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//         office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-//         office4 = createPostOffice(4, "testgroup", sc, ms, pm, tr);
-//         office5 = createPostOffice(5, "testgroup", sc, ms, pm, tr);
-//         office6 = createPostOffice(6, "testgroup", sc, ms, pm, tr);
-//    
-//         // We deploy the queue on nodes 1, 2, 3, 4 and 5
-//         // We don't deploy on node 6
-//         
-//         LocalClusteredQueue queue1 =
-//            new LocalClusteredQueue(1, "queue1", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office1.bindQueue(new SimpleCondition("queue1"), queue1);
-//
-//         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         queue1.add(receiver1);
-//         
-//         LocalClusteredQueue queue2 =
-//            new LocalClusteredQueue(2, "queue1", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office2.bindQueue(new SimpleCondition("queue1"), queue2);
-//
-//         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         queue2.add(receiver2);
-//         
-//         LocalClusteredQueue queue3 =
-//            new LocalClusteredQueue(3, "queue1", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office3.bindQueue(new SimpleCondition("queue1"), queue3);
-//
-//         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         queue3.add(receiver3);
-//         
-//         LocalClusteredQueue queue4 =
-//            new LocalClusteredQueue(4, "queue1", channelIDManager.getID(), ms, pm,
-//                                    true, recoverable, -1, null);
-//
-//         office4.bindQueue(new SimpleCondition("queue1"), queue4);
-//
-//         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         queue4.add(receiver4);
-//         
-//         LocalClusteredQueue queue5 =
-//            new LocalClusteredQueue(5, "queue1", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office5.bindQueue(new SimpleCondition("queue1"), queue5);
-//         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         queue5.add(receiver5);
-//        
-//         // We are using a AlwaysLocalRoutingPolicy so only the local queue should ever get the
-//         // message if the filter matches
-//                          
-//         Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         MessageReference ref = ms.reference(msg);         
-//         boolean routed = office1.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         checkContainsAndAcknowledge(msg, receiver1, queue1);
-//         this.checkEmpty(receiver2);
-//         this.checkEmpty(receiver3);
-//         this.checkEmpty(receiver4);
-//         this.checkEmpty(receiver5);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office2.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         this.checkEmpty(receiver1);
-//         checkContainsAndAcknowledge(msg, receiver2, queue2);
-//         this.checkEmpty(receiver3);
-//         this.checkEmpty(receiver4);
-//         this.checkEmpty(receiver5);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office3.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         this.checkEmpty(receiver1);
-//         this.checkEmpty(receiver2);
-//         checkContainsAndAcknowledge(msg, receiver3, queue3);
-//         this.checkEmpty(receiver4);
-//         this.checkEmpty(receiver5);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office4.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         this.checkEmpty(receiver1);
-//         this.checkEmpty(receiver2);
-//         this.checkEmpty(receiver3);
-//         checkContainsAndAcknowledge(msg, receiver4, queue3);
-//         this.checkEmpty(receiver5);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office5.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         this.checkEmpty(receiver1);
-//         this.checkEmpty(receiver2);         
-//         this.checkEmpty(receiver3);
-//         this.checkEmpty(receiver4);
-//         checkContainsAndAcknowledge(msg, receiver5, queue5);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office6.route(ref, new SimpleCondition("queue1"), null);         
-//         assertTrue(routed);
-//         
-//         //The actual queue that receives the mesage is determined by the routing policy
-//         //The default uses round robin for the nodes (this is tested more thoroughly in
-//         //its own test)
-//         
-//         Thread.sleep(1000);
-//         
-//         checkContainsAndAcknowledge(msg, receiver1, queue1);
-//         this.checkEmpty(receiver1);
-//         this.checkEmpty(receiver2);         
-//         this.checkEmpty(receiver3);
-//         this.checkEmpty(receiver4);
-//         this.checkEmpty(receiver5);
-//                 
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {            
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            office2.stop();
-//         }
-//         
-//         if (office3 != null)
-//         {            
-//            office3.stop();
-//         }
-//         
-//         if (office4 != null)
-//         {
-//            office4.stop();
-//         }
-//         
-//         if (office5 != null)
-//         {            
-//            office5.stop();
-//         }
-//         
-//         if (office6 != null)
-//         {            
-//            office6.stop();
-//         }
-//         
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }
-//      }
-//   }
-//   
-//
-//   /**
-//    * Clustered post offices should be able to have local queues bound to them too.
-//    */
-//   protected void routeLocalQueues(boolean persistentMessage, boolean recoverable) throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      PostOffice office2 = null;
-//      PostOffice office3 = null;
-//                    
-//      try
-//      {   
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//         office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-//
-//         LocalClusteredQueue sub1 =
-//            new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office1.bindQueue(new SimpleCondition("topic"), sub1);
-//
-//         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sub1.add(receiver1);
-//         
-//         LocalClusteredQueue sub2 =
-//            new LocalClusteredQueue(2, "sub2", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office2.bindQueue(new SimpleCondition("topic"), sub2);
-//         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sub2.add(receiver2);
-//         
-//         LocalClusteredQueue sub3 =
-//            new LocalClusteredQueue(3, "sub3", channelIDManager.getID(), ms, pm, true,
-//                                    recoverable, -1, null);
-//
-//         office3.bindQueue(new SimpleCondition("topic"), sub3);
-//         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sub3.add(receiver3);
-//         
-//         //Only the local sub should get it since we have bound locally
-//         
-//         Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
-//         MessageReference ref = ms.reference(msg);         
-//         boolean routed = office1.route(ref, new SimpleCondition("topic"), null);         
-//         assertTrue(routed);         
-//         Thread.sleep(500);         
-//         checkContainsAndAcknowledge(msg, receiver1, sub1);
-//         this.checkEmpty(receiver2);
-//         this.checkEmpty(receiver3);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office2.route(ref, new SimpleCondition("topic"), null);         
-//         assertTrue(routed);                  
-//         Thread.sleep(500);
-//         this.checkEmpty(receiver1);
-//         checkContainsAndAcknowledge(msg, receiver2, sub2);
-//         this.checkEmpty(receiver3);
-//         
-//         msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);      
-//         ref = ms.reference(msg);         
-//         routed = office3.route(ref, new SimpleCondition("topic"), null);           
-//         assertTrue(routed);         
-//         Thread.sleep(500);
-//         this.checkEmpty(receiver1);         
-//         this.checkEmpty(receiver2);
-//         checkContainsAndAcknowledge(msg, receiver3, sub2);           
-//                         
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }
-//         
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {            
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            office2.stop();
-//         }
-//         
-//         if (office3 != null)
-//         {            
-//            office3.stop();
-//         }
-//         
-//      }
-//   }
-//   
-//   
-//   
-//   /**
-//    * We set up a complex scenario with multiple subscriptions, shared and unshared on different
-//    * nodes.
-//    * 
-//    * node1: no subscriptions
-//    * node2: 2 non durable
-//    * node3: 1 non shared durable, 1 non durable
-//    * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
-//    * node5: 2 shared durable (shared1 and shared2)
-//    * node6: 1 shared durable (shared2), 1 non durable
-//    * node7: 1 shared durable (shared2)
-//    * 
-//    * Then we send mess
-//    */
-//   protected void routeComplexTopic(boolean persistent) throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      PostOffice office2 = null;
-//      PostOffice office3 = null;
-//      PostOffice office4 = null;
-//      PostOffice office5 = null;
-//      PostOffice office6 = null;
-//      PostOffice office7 = null;
-//        
-//      try
-//      {   
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//         office3 = createPostOffice(3, "testgroup", sc, ms, pm, tr);
-//         office4 = createPostOffice(4, "testgroup", sc, ms, pm, tr);
-//         office5 = createPostOffice(5, "testgroup", sc, ms, pm, tr);
-//         office6 = createPostOffice(6, "testgroup", sc, ms, pm, tr);
-//         office7 = createPostOffice(7, "testgroup", sc, ms, pm, tr);
-//         
-//         //Node 2
-//         //======
-//         
-//         // Non durable 1 on node 2
-//         LocalClusteredQueue nonDurable1 =
-//            new LocalClusteredQueue(2, "nondurable1", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//
-//         office2.bindQueue(new SimpleCondition("topic"), nonDurable1);
-//         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable1.add(receiver1);
-//         
-//         // Non durable 2 on node 2
-//         LocalClusteredQueue nonDurable2 =
-//            new LocalClusteredQueue(2, "nondurable2", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//
-//         office2.bindQueue(new SimpleCondition("topic"), nonDurable2);
-//         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable2.add(receiver2);
-//         
-//         //Node 3
-//         //======
-//         
-//         // Non shared durable
-//         LocalClusteredQueue nonSharedDurable1 =
-//            new LocalClusteredQueue(3, "nonshareddurable1", channelIDManager.getID(), ms,
-//                                    pm, true, true, -1, null);
-//
-//         office3.bindQueue(new SimpleCondition("topic"), nonSharedDurable1);
-//         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonSharedDurable1.add(receiver3);
-//         
-//         // Non durable
-//         LocalClusteredQueue nonDurable3 =
-//            new LocalClusteredQueue(3, "nondurable3", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//
-//         office3.bindQueue(new SimpleCondition("topic"), nonDurable3);
-//         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable3.add(receiver4);
-//         
-//         //Node 4
-//         //======
-//         
-//         // Shared durable
-//         LocalClusteredQueue sharedDurable1 =
-//            new LocalClusteredQueue(4, "shareddurable1", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//
-//         office4.bindQueue(new SimpleCondition("topic"), sharedDurable1);
-//         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sharedDurable1.add(receiver5);
-//         
-//         // Non shared durable
-//         LocalClusteredQueue nonSharedDurable2 =
-//            new LocalClusteredQueue(4, "nonshareddurable2", channelIDManager.getID(), ms,
-//                                    pm, true, true, -1, null);
-//
-//         office4.bindQueue(new SimpleCondition("topic"), nonSharedDurable2);
-//         SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonSharedDurable2.add(receiver6);
-//         
-//         // Non durable
-//         LocalClusteredQueue nonDurable4 =
-//            new LocalClusteredQueue(4, "nondurable4", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//
-//         office4.bindQueue(new SimpleCondition("topic"), nonDurable4);
-//         SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable4.add(receiver7);
-//         
-//         // Non durable
-//         LocalClusteredQueue nonDurable5 =
-//            new LocalClusteredQueue(4, "nondurable5", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         office4.bindQueue(new SimpleCondition("topic"), nonDurable5);
-//         SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable5.add(receiver8);
-//         
-//         // Non durable
-//         LocalClusteredQueue nonDurable6 =
-//            new LocalClusteredQueue(4, "nondurable6", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         office4.bindQueue(new SimpleCondition("topic"), nonDurable6);
-//         SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable6.add(receiver9);
-//         
-//         // Node 5
-//         //=======
-//         // Shared durable
-//         LocalClusteredQueue sharedDurable2 =
-//            new LocalClusteredQueue(5, "shareddurable1", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//
-//         office5.bindQueue(new SimpleCondition("topic"), sharedDurable2);
-//         SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sharedDurable2.add(receiver10);
-//         
-//         // Shared durable
-//         LocalClusteredQueue sharedDurable3 =
-//            new LocalClusteredQueue(5, "shareddurable2", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//
-//         office5.bindQueue(new SimpleCondition("topic"), sharedDurable3);
-//         SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sharedDurable3.add(receiver11);
-//         
-//         // Node 6
-//         //=========
-//         LocalClusteredQueue sharedDurable4 =
-//            new LocalClusteredQueue(6, "shareddurable2", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//
-//         office6.bindQueue(new SimpleCondition("topic"), sharedDurable4);
-//         SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sharedDurable4.add(receiver12);
-//         
-//         LocalClusteredQueue nonDurable7 =
-//            new LocalClusteredQueue(6, "nondurable7", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         office6.bindQueue(new SimpleCondition("topic"), nonDurable7);
-//         SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         nonDurable7.add(receiver13);
-//         
-//         //Node 7
-//         //=======
-//         LocalClusteredQueue sharedDurable5 =
-//            new LocalClusteredQueue(7, "shareddurable2", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//
-//         office7.bindQueue(new SimpleCondition("topic"), sharedDurable5);
-//         SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//         sharedDurable5.add(receiver14);
-//         
-//         
-//         //Send 1 message at node1
-//         //========================
-//         
-//         List msgs = sendMessages("topic", persistent, null, 1, null);
-//         
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);
-//         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//         
-//         //n6
-//         checkEmpty(receiver12);
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkEmpty(receiver14);
-//         
-//         
-//         //Send 1 message at node2
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office2, 1, null);
-//         
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);
-//         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//         
-//         //n6
-//         checkEmpty(receiver12);
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkEmpty(receiver14);
-//         
-//         //Send 1 message at node3
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office3, 1, null);
-//         
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);
-//         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//         
-//         //n6
-//         checkEmpty(receiver12);
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkEmpty(receiver14);     
-//         
-//         //Send 1 message at node4
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office4, 1, null);         
-//               
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1); // shared durable 1
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);       //shared durable 1
-//         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);     //shared durable 2    
-//         
-//         //n6
-//         checkEmpty(receiver12); // shared durable 2
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7); 
-//         
-//         //n7
-//         checkEmpty(receiver14);
-//         
-//         //Send 1 message at node5
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office5, 1, null);
-//             
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkEmpty(receiver5);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkContainsAndAcknowledge(msgs, receiver10, sharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
-//         
-//         //n6
-//         checkEmpty(receiver12);
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkEmpty(receiver12);
-//         
-//         //Send 1 message at node6
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office6, 1, null);
-//             
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);
-//        
-//         checkEmpty(receiver11);
-//         
-//         //n6
-//         checkContainsAndAcknowledge(msgs, receiver12, sharedDurable4);         
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkEmpty(receiver12);
-//         
-//         //Send 1 message at node7
-//         //========================
-//         
-//         msgs = sendMessages("topic", persistent, office7, 1, null);
-//
-//         //n2
-//         checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver2, nonDurable2);
-//         
-//         //n3
-//         checkContainsAndAcknowledge(msgs, receiver3, nonSharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
-//         
-//         //n4
-//         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
-//         checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
-//         checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
-//         checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
-//         checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
-//         
-//         //n5
-//         checkEmpty(receiver10);
-//         checkEmpty(receiver11);
-//         
-//         //n6
-//         checkEmpty(receiver12);
-//         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
-//         
-//         //n7
-//         checkContainsAndAcknowledge(msgs, receiver14, sharedDurable5);
-//         
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }        
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {            
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            office2.stop();
-//         }
-//         
-//         if (office3 != null)
-//         {            
-//            try
-//            {
-//               office3.unbindQueue("nonshareddurable1");
-//            }
-//            catch (Exception ignore)
-//            {   
-//               ignore.printStackTrace();
-//            }
-//            office3.stop();
-//         }
-//         
-//         if (office4 != null)
-//         {
-//            try
-//            {
-//               office4.unbindQueue("shareddurable1");
-//               office4.unbindQueue("nonshareddurable2");
-//            }
-//            catch (Exception ignore)
-//            {           
-//               ignore.printStackTrace();
-//            }
-//            office4.stop();
-//         }
-//         
-//         if (office5 != null)
-//         {      
-//            try
-//            {
-//               office5.unbindQueue("shareddurable1");
-//               office5.unbindQueue("shareddurable2");
-//            }
-//            catch (Exception ignore)
-//            {               
-//               ignore.printStackTrace();
-//            }
-//            office5.stop();
-//         }
-//         
-//         if (office6 != null)
-//         {         
-//            try
-//            {
-//               office6.unbindQueue("shareddurable2");
-//            }
-//            catch (Exception ignore)
-//            {               
-//               ignore.printStackTrace();
-//            }
-//            office6.stop();
-//         }
-//         
-//         if (office7 != null)
-//         {      
-//            try
-//            {
-//               office7.unbindQueue("shareddurable2");
-//            }
-//            catch (Exception ignore)
-//            {               
-//               ignore.printStackTrace();
-//            }
-//            office7.stop();
-//         }
-//        
-//      }
-//   }
-//   
-//   
-//
-//   
-//   
-//   protected void clusteredTransactionalRoute(boolean persistent) throws Throwable
-//   {
-//      PostOffice office1 = null;
-//      
-//      PostOffice office2 = null;
-//      
-//      try
-//      {   
-//         //Start two offices
-//         
-//         office1 = createPostOffice(1, "testgroup", sc, ms, pm, tr);
-//         office2 = createPostOffice(2, "testgroup", sc, ms, pm, tr);
-//     
-//         LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
-//         Binding[] bindings = new Binding[16];
-//         
-//         queues[0] =
-//            new LocalClusteredQueue(1, "sub1", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[0] = office1.bindQueue(new SimpleCondition("topic1"), queues[0]);
-//         
-//         queues[1] =
-//            new LocalClusteredQueue(1, "sub2", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[1] = office1.bindQueue(new SimpleCondition("topic1"), queues[1]);
-//         
-//         queues[2] =
-//            new LocalClusteredQueue(2, "sub3", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[2] = office2.bindQueue(new SimpleCondition("topic1"), queues[2]);
-//         
-//         queues[3] =
-//            new LocalClusteredQueue(2, "sub4", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[3] = office2.bindQueue(new SimpleCondition("topic1"), queues[3]);
-//         
-//         queues[4] =
-//            new LocalClusteredQueue(2, "sub5", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[4] = office2.bindQueue(new SimpleCondition("topic1"), queues[4]);
-//         
-//         queues[5] =
-//            new LocalClusteredQueue(1, "sub6", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[5] = office1.bindQueue(new SimpleCondition("topic1"), queues[5]);
-//         
-//         queues[6] =
-//            new LocalClusteredQueue(1, "sub7", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[6] = office1.bindQueue(new SimpleCondition("topic1"), queues[6]);
-//         
-//         queues[7] =
-//            new LocalClusteredQueue(1, "sub8", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[7] = office1.bindQueue(new SimpleCondition("topic1"), queues[7]);
-//         
-//         queues[8] =
-//            new LocalClusteredQueue(1, "sub9", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[8] = office1.bindQueue(new SimpleCondition("topic2"), queues[8]);
-//         
-//         queues[9] =
-//            new LocalClusteredQueue(1, "sub10", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[9] = office1.bindQueue(new SimpleCondition("topic2"), queues[9]);
-//         
-//         queues[10] =
-//            new LocalClusteredQueue(2, "sub11", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[10] = office2.bindQueue(new SimpleCondition("topic2"), queues[10]);
-//         
-//         queues[11] =
-//            new LocalClusteredQueue(2, "sub12", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[11] = office2.bindQueue(new SimpleCondition("topic2"), queues[11]);
-//         
-//         queues[12] =
-//            new LocalClusteredQueue(2, "sub13", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[12] = office2.bindQueue(new SimpleCondition("topic2"), queues[12]);
-//         
-//         queues[13] =
-//            new LocalClusteredQueue(1, "sub14", channelIDManager.getID(), ms, pm,
-//                                    true, false, -1, null);
-//         bindings[13] = office1.bindQueue(new SimpleCondition("topic2"), queues[13]);
-//         
-//         queues[14] =
-//            new LocalClusteredQueue(1, "sub15", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[14] = office1.bindQueue(new SimpleCondition("topic2"), queues[14]);
-//         
-//         queues[15] =
-//            new LocalClusteredQueue(1, "sub16", channelIDManager.getID(), ms, pm,
-//                                    true, true, -1, null);
-//         bindings[15] = office1.bindQueue(new SimpleCondition("topic2"), queues[15]);
-//
-//         SimpleReceiver[] receivers = new SimpleReceiver[16];
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-//            queues[i].add(receivers[i]);
-//         }
-//         
-//         //First for topic 1
-//         
-//         Message msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
-//         MessageReference ref1 = ms.reference(msg1);
-//         
-//         Message msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
-//         MessageReference ref2 = ms.reference(msg2);
-//         
-//         Transaction tx = tr.createTransaction();
-//
-//         boolean routed = office1.route(ref1, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.commit();
-//         
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);
-//         
-//         for (int i = 0; i < 8; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());            
-//            receivers[i].acknowledge(msgRec1, null);
-//            receivers[i].acknowledge(msgRec2, null);
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);            
-//            assertTrue(msgs.isEmpty());                        
-//            receivers[i].clear();
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
-//         ref2 = ms.reference(msg2);
-//         
-//         tx = tr.createTransaction();
-//
-//         routed = office1.route(ref1, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//         
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);         
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.rollback();
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         //Now send some non transactionally
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);
-//         ref2 = ms.reference(msg2);
-//         
-//         routed = office1.route(ref1, new SimpleCondition("topic1"), null);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), null);         
-//         assertTrue(routed);
-//         
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);         
-//         
-//         //And acknowledge transactionally
-//         
-//         tx = tr.createTransaction();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//                        
-//            receivers[i].acknowledge(msgRec1, tx);
-//            receivers[i].acknowledge(msgRec2, tx);
-//             
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//                       
-//            receivers[i].clear();
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.commit();
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         
-//         // and the rollback
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
-//         ref2 = ms.reference(msg2);
-//         
-//         routed = office1.route(ref1, new SimpleCondition("topic1"), null);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), null);         
-//         assertTrue(routed);
-//         
-//         Thread.sleep(1000);
-//                 
-//         tx = tr.createTransaction();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//                        
-//            receivers[i].acknowledge(msgRec1, tx);
-//            receivers[i].acknowledge(msgRec2, tx);
-//               
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//            
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.rollback();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//                                 
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//            
-//            receivers[i].acknowledge(msgRec1, null);
-//            receivers[i].acknowledge(msgRec2, null);
-//                           
-//            receivers[i].clear();
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         
-//         // Now for topic 2
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);    
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);     
-//         ref2 = ms.reference(msg2);
-//         
-//         tx = tr.createTransaction();
-//
-//         routed = office1.route(ref1, new SimpleCondition("topic2"), tx);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic2"), tx);         
-//         assertTrue(routed);
-//         
-//         
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.commit();
-//         
-//         //Messages are sent asych so may take some finite time to arrive
-//         Thread.sleep(1000);
-//         
-//         for (int i = 0; i < 8; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());            
-//            receivers[i].acknowledge(msgRec1, null);
-//            receivers[i].acknowledge(msgRec2, null);
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty()); 
-//            receivers[i].clear();
-//         }
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
-//         ref2 = ms.reference(msg2);
-//         
-//         tx = tr.createTransaction();
-//
-//         routed = office1.route(ref1, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic1"), tx);         
-//         assertTrue(routed);
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         tx.rollback();
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         //Now send some non transactionally
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);      
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);      
-//         ref2 = ms.reference(msg2);
-//         
-//         routed = office1.route(ref1, new SimpleCondition("topic2"), null);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic2"), null);         
-//         assertTrue(routed);
-//         
-//         Thread.sleep(1000);
-//         
-//         //And acknowledge transactionally
-//         
-//         tx = tr.createTransaction();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//                        
-//            receivers[i].acknowledge(msgRec1, tx);
-//            receivers[i].acknowledge(msgRec2, tx);
-//                        
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//            
-//            receivers[i].clear();
-//         }
-//         
-//         
-//         
-//         tx.commit();
-//         
-//         for (int i = 0; i < 16; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         
-//         // and the rollback
-//         
-//         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
-//         ref1 = ms.reference(msg1);
-//         
-//         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
-//         ref2 = ms.reference(msg2);
-//         
-//         routed = office1.route(ref1, new SimpleCondition("topic2"), null);         
-//         assertTrue(routed);
-//         routed = office1.route(ref2, new SimpleCondition("topic2"), null);         
-//         assertTrue(routed);
-//         
-//         Thread.sleep(1000);
-//          
-//         tx = tr.createTransaction();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//            
-//            
-//            receivers[i].acknowledge(msgRec1, tx);
-//            receivers[i].acknowledge(msgRec2, tx);
-//            
-//            
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//         }
-//         
-//         
-//         
-//         tx.rollback();
-//         
-//         for (int i = 0; i < 8; i++)
-//         {
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//            msgs = queues[i].browse();
-//            assertNotNull(msgs);
-//            assertTrue(msgs.isEmpty());
-//         }
-//         
-//         for (int i = 8; i < 16; i++)
-//         {         
-//            List msgs = receivers[i].getMessages();
-//            assertNotNull(msgs);
-//            assertEquals(2, msgs.size());
-//            Message msgRec1 = (Message)msgs.get(0);
-//            assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-//            Message msgRec2 = (Message)msgs.get(1);
-//            assertEquals(msg2.getMessageID(), msgRec2.getMessageID());      
-//                                              
-//            int deliveringCount = queues[i].getDeliveringCount();
-//            
-//            assertEquals(2, deliveringCount);
-//            
-//            receivers[i].acknowledge(msgRec1, null);
-//            receivers[i].acknowledge(msgRec2, null);             
-//            
-//            receivers[i].clear();
-//         }
-//         if (checkNoMessageData())
-//         {
-//            fail("Message data still in database");
-//         }
-//      }
-//      finally
-//      {
-//         if (office1 != null)
-//         {
-//            try
-//            {
-//               office1.unbindQueue("sub7");
-//               office1.unbindQueue("sub8");           
-//               office1.unbindQueue("sub15");
-//               office1.unbindQueue("sub16");
-//            }
-//            catch (Exception ignore)
-//            {
-//               ignore.printStackTrace();
-//            }
-//                        
-//            office1.stop();
-//         }
-//         
-//         if (office2 != null)
-//         {
-//            try
-//            {
-//               office2.unbindQueue("sub5");
-//               office2.unbindQueue("sub13");
-//            }
-//            catch (Exception ignore)
-//            {
-//               ignore.printStackTrace();
-//            }
-//            
-//            office2.stop();
-//         }
-//      }
-//   }
    
    protected void setUp() throws Exception
    {
@@ -3308,6 +1598,1251 @@
 
    // Private --------------------------------------------------------------------------------------
 
+   private void clusteredTransactionalRoute(boolean persistent) throws Throwable
+   {
+      PostOffice office1 = null;
+      
+      PostOffice office2 = null;
+      
+      try
+      {   
+         //Start two offices
+         
+         office1 = createClusteredPostOffice(1, "testgroup");
+         office2 = createClusteredPostOffice(2, "testgroup");
+     
+         Queue[] queues = new Queue[16];
+         
+         //condition1
+
+         queues[0] = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[0].activate();
+         boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[0], false), false);
+         assertTrue(added);
+         
+         queues[1] = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[1].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[1], false), false);
+         assertTrue(added);
+         
+         queues[2] = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[2].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[2], false), false);
+         assertTrue(added);
+         
+         queues[3] = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[3].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[3], false), false);
+         assertTrue(added);
+         
+         queues[4] = new MessagingQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[4].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[4], false), false);
+         assertTrue(added);
+         
+         queues[5] = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[5].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[5], false), false);
+         assertTrue(added);
+         
+         queues[6] = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[6].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[6], false), false);
+         assertTrue(added);
+         
+         queues[7] = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[7].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[7], false), false);
+         assertTrue(added);
+         
+         //condition2
+         
+         queues[8] = new MessagingQueue(1, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[8].activate();
+         added= office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[8], false), false);
+         assertTrue(added);
+         
+         queues[9] = new MessagingQueue(1, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[9].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[9], false), false);
+         assertTrue(added);
+         
+         queues[10] = new MessagingQueue(2, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[10].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[10], false), false);
+         assertTrue(added);
+         
+         queues[11] = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[11].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[11], false), false);
+         assertTrue(added);
+         
+         queues[12] = new MessagingQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[12].activate();
+         added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[12], false), false);
+         assertTrue(added);
+         
+         queues[13] = new MessagingQueue(1, "sub14", channelIDManager.getID(), ms, pm, false, -1, null, true);
+         queues[13].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[13], false), false);
+         assertTrue(added);
+         
+         queues[14] = new MessagingQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[14].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[14], false), false);
+         assertTrue(added);
+         
+         queues[15] = new MessagingQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, -1, null, true);
+         queues[15].activate();
+         added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[15], false), false);
+         assertTrue(added);
+
+         SimpleReceiver[] receivers = new SimpleReceiver[16];
+         
+         for (int i = 0; i < 16; i++)
+         {
+            receivers[i] = new SimpleReceiver("blah" + i, SimpleReceiver.ACCEPTING);
+            queues[i].getLocalDistributor().add(receivers[i]);
+         }
+         
+         //First for topic 1
+         
+         Message msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
+         MessageReference ref1 = ms.reference(msg1);
+         
+         Message msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
+         MessageReference ref2 = ms.reference(msg2);
+         
+         Transaction tx = tr.createTransaction();
+
+         boolean routed = office1.route(ref1, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         routed = office1.route(ref2, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         
+         for (int i = 0; i < 16; i++)
+         {
+         	log.info("i is " + i);
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+         log.info("committing");
+         tx.commit();
+         log.info("committed");
+         
+         //Messages are sent asych so may take some finite time to arrive
+         Thread.sleep(1000);
+         
+         for (int i = 0; i < 16; i++)
+         {
+         	if (i >= 8 || (queues[i].getNodeID() == 2 && queues[i].isRecoverable()))
+         	{     
+         		//Shouldn't get message
+         		List msgs = receivers[i].getMessages();
+         		assertNotNull(msgs);
+         		assertTrue(msgs.isEmpty());
+         		msgs = queues[i].browse(null);
+         		assertNotNull(msgs);
+         		assertTrue(msgs.isEmpty());
+         	}
+         	else
+         	{
+         		//Should get message
+         		log.info("is is " + i);
+         		log.info("trying with receiver " + receivers[i]);
+         		List msgs = receivers[i].getMessages();
+         		assertNotNull(msgs);
+         		assertEquals(2, msgs.size());
+         		Message msgRec1 = (Message)msgs.get(0);
+         		assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
+         		Message msgRec2 = (Message)msgs.get(1);
+         		assertEquals(msg2.getMessageID(), msgRec2.getMessageID());            
+         		receivers[i].acknowledge(msgRec1, null);
+         		receivers[i].acknowledge(msgRec2, null);
+         		msgs = queues[i].browse(null);
+         		assertNotNull(msgs);            
+         		assertTrue(msgs.isEmpty());                        
+         		receivers[i].clear();
+         	}
+         }
+         
+         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
+         ref1 = ms.reference(msg1);
+         
+         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
+         ref2 = ms.reference(msg2);
+         
+         tx = tr.createTransaction();
+
+         routed = office1.route(ref1, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         routed = office1.route(ref2, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         
+         //Messages are sent asych so may take some finite time to arrive
+         Thread.sleep(1000);         
+         
+         for (int i = 0; i < 16; i++)
+         {
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+         tx.rollback();
+         
+         for (int i = 0; i < 16; i++)
+         {
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+         
+         // Now for topic 2
+         
+         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);    
+         ref1 = ms.reference(msg1);
+         
+         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);     
+         ref2 = ms.reference(msg2);
+         
+         tx = tr.createTransaction();
+
+         routed = office2.route(ref1, new SimpleCondition("condition2"), tx);         
+         assertTrue(routed);
+         routed = office2.route(ref2, new SimpleCondition("condition2"), tx);         
+         assertTrue(routed);
+                           
+         for (int i = 0; i < 16; i++)
+         {
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+         tx.commit();
+         
+         //Messages are sent asych so may take some finite time to arrive
+         Thread.sleep(1000);
+         
+         for (int i = 8; i < 16; i++)
+         { 
+	         if (i < 8 || (queues[i].getNodeID() == 1 && queues[i].isRecoverable()))
+	      	{     
+	         	//	Shouldn't get message
+	      		List msgs = receivers[i].getMessages();
+	      		assertNotNull(msgs);
+	      		assertTrue(msgs.isEmpty());
+	      		msgs = queues[i].browse(null);
+	      		assertNotNull(msgs);
+	      		assertTrue(msgs.isEmpty());
+	      	}
+	      	else
+	      	{
+	      		// Should get message
+	      		log.info("is is " + i);
+	      		log.info("trying with receiver " + receivers[i]);
+	      		List msgs = receivers[i].getMessages();
+	      		assertNotNull(msgs);
+	      		assertEquals(2, msgs.size());
+	      		Message msgRec1 = (Message)msgs.get(0);
+	      		assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
+	      		Message msgRec2 = (Message)msgs.get(1);
+	      		assertEquals(msg2.getMessageID(), msgRec2.getMessageID());            
+	      		receivers[i].acknowledge(msgRec1, null);
+	      		receivers[i].acknowledge(msgRec2, null);
+	      		msgs = queues[i].browse(null);
+	      		assertNotNull(msgs);            
+	      		assertTrue(msgs.isEmpty());                        
+	      		receivers[i].clear();
+	      	}
+         }
+               
+         msg1 = CoreMessageFactory.createCoreMessage(1, persistent, null);;      
+         ref1 = ms.reference(msg1);
+         
+         msg2 = CoreMessageFactory.createCoreMessage(2, persistent, null);;      
+         ref2 = ms.reference(msg2);
+         
+         tx = tr.createTransaction();
+
+         routed = office1.route(ref1, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         routed = office1.route(ref2, new SimpleCondition("condition1"), tx);         
+         assertTrue(routed);
+         
+         for (int i = 0; i < 16; i++)
+         {
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+         tx.rollback();
+         
+         for (int i = 0; i < 16; i++)
+         {
+            List msgs = receivers[i].getMessages();
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+            msgs = queues[i].browse(null);
+            assertNotNull(msgs);
+            assertTrue(msgs.isEmpty());
+         }
+         
+        
+         if (checkNoMessageData())
+         {
+            fail("Message data still in database");
+         }
+      }
+      finally
+      {
+         if (office1 != null)
+         {
+            try
+            {
+               office1.removeBinding("sub7", false);
+               office1.removeBinding("sub8", false);           
+               office1.removeBinding("sub15", false);
+               office1.removeBinding("sub16", false);
+            }
+            catch (Exception ignore)
+            {
+               ignore.printStackTrace();
+            }
+                        
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            try
+            {
+               office2.removeBinding("sub5", false);
+               office2.removeBinding("sub13", false);
+            }
+            catch (Exception ignore)
+            {
+               ignore.printStackTrace();
+            }
+            
+            office2.stop();
+         }
+      }
+   }
+   
+   private void clusteredRouteWithFilter(boolean persistentMessage) throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	PostOffice office2 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+   		office2 = createClusteredPostOffice(2, "testgroup");
+
+   		SimpleFilter filter1 = new SimpleFilter(2);
+   		SimpleFilter filter2 = new SimpleFilter(3);
+
+   		Queue queue1 =  new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter1, true);
+   		queue1.activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+   		assertTrue(added);
+
+   		Queue queue2 = new MessagingQueue(2, "queue2", channelIDManager.getID(), ms, pm, false, -1, filter2, true);
+   		queue2.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+   		assertTrue(added);
+
+   		Queue queue3 = new MessagingQueue(2, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue3.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+   		assertTrue(added);
+
+   		SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue1.getLocalDistributor().add(receiver1);
+
+   		SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue2.getLocalDistributor().add(receiver2);
+
+   		SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue3.getLocalDistributor().add(receiver3);
+
+   		Message msg1 = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+   		MessageReference ref1 = ms.reference(msg1);  
+   		boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);   
+   		assertTrue(routed);
+
+
+   		Message msg2 = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);      
+   		MessageReference ref2 = ms.reference(msg2);         
+   		routed = office1.route(ref2, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Message msg3 = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);      
+   		MessageReference ref3 = ms.reference(msg3);         
+   		routed = office1.route(ref3, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Thread.sleep(1000);
+
+   		List msgs = receiver1.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(1, msgs.size());
+   		Message msgRec = (Message)msgs.get(0);
+   		assertTrue(msg2 == msgRec);
+   		receiver1.acknowledge(msgRec, null);
+   		msgs = queue1.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty());  
+
+   		msgs = receiver2.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(1, msgs.size());
+   		msgRec = (Message)msgs.get(0);
+   		assertTrue(msg3 == msgRec);
+   		receiver2.acknowledge(msgRec, null);
+   		msgs = queue2.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty());  
+
+   		msgs = receiver3.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		Message msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		Message msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		Message msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver3.acknowledge(msgRec1, null);
+   		receiver3.acknowledge(msgRec2, null);
+   		receiver3.acknowledge(msgRec3, null);
+   		msgs = queue3.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			office1.stop();
+   		}
+
+   		if (office2 != null)
+   		{
+   			office2.stop();
+   		}
+
+   	}
+   }
+   
+   private void clusteredRouteFourNodes(boolean persistentMessage) throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	PostOffice office2 = null;
+   	
+   	PostOffice office3 = null;
+
+   	PostOffice office4 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+   		office2 = createClusteredPostOffice(2, "testgroup");
+   		office3 = createClusteredPostOffice(3, "testgroup");
+   		office4 = createClusteredPostOffice(4, "testgroup");
+
+   		Queue queue1 =  new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue1.activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue2 =  new MessagingQueue(2, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue2.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue2, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue3 =  new MessagingQueue(3, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue3.activate();
+   		added = office3.addBinding(new Binding(new SimpleCondition("condition1"), queue3, false), false);
+   		assertTrue(added);
+   		
+   		Queue queue4 =  new MessagingQueue(4, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue4.activate();
+   		added = office4.addBinding(new Binding(new SimpleCondition("condition1"), queue4, false), false);
+   		assertTrue(added);
+
+   		SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue1.getLocalDistributor().add(receiver1);
+
+   		SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue2.getLocalDistributor().add(receiver2);
+
+   		SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue3.getLocalDistributor().add(receiver3);
+   		
+   		SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue4.getLocalDistributor().add(receiver4);
+
+   		Message msg1 = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+   		MessageReference ref1 = ms.reference(msg1);  
+   		boolean routed = office1.route(ref1, new SimpleCondition("condition1"), null);   
+   		assertTrue(routed);
+
+   		Message msg2 = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);      
+   		MessageReference ref2 = ms.reference(msg2);         
+   		routed = office1.route(ref2, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Message msg3 = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);      
+   		MessageReference ref3 = ms.reference(msg3);         
+   		routed = office1.route(ref3, new SimpleCondition("condition1"), null);      
+   		assertTrue(routed);
+
+   		Thread.sleep(1000);
+   		
+   		List msgs = receiver1.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		Message msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		Message msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		Message msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver1.acknowledge(msgRec1, null);
+   		receiver1.acknowledge(msgRec2, null);
+   		receiver1.acknowledge(msgRec3, null);
+   		msgs = queue1.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		msgs = receiver2.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver2.acknowledge(msgRec1, null);
+   		receiver2.acknowledge(msgRec2, null);
+   		receiver2.acknowledge(msgRec3, null);
+   		msgs = queue2.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+
+   		msgs = receiver3.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver3.acknowledge(msgRec1, null);
+   		receiver3.acknowledge(msgRec2, null);
+   		receiver3.acknowledge(msgRec3, null);
+   		msgs = queue3.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		msgs = receiver4.getMessages();
+   		assertNotNull(msgs);
+   		assertEquals(3, msgs.size());
+   		msgRec1 = (Message)msgs.get(0);
+   		assertTrue(msg1 == msgRec1);
+   		msgRec2 = (Message)msgs.get(1);
+   		assertTrue(msg2 == msgRec2);
+   		msgRec3 = (Message)msgs.get(2);
+   		assertTrue(msg3 == msgRec3);
+
+   		receiver4.acknowledge(msgRec1, null);
+   		receiver4.acknowledge(msgRec2, null);
+   		receiver4.acknowledge(msgRec3, null);
+   		msgs = queue4.browse(null);
+   		assertNotNull(msgs);
+   		assertTrue(msgs.isEmpty()); 
+   		
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			office1.stop();
+   		}
+
+   		if (office2 != null)
+   		{
+   			office2.stop();
+   		}
+   		
+   		if (office3 != null)
+   		{
+   			office3.stop();
+   		}
+
+   		if (office4 != null)
+   		{
+   			office4.stop();
+   		}
+
+   	}
+   }
+   
+   
+
+   private void clusteredRoute(boolean persistentMessage) throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	PostOffice office2 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+   		office2 = createClusteredPostOffice(2, "testgroup");
+
+   		//A mixture of durable and non durable queues
+
+   		Queue[] queues = new Queue[16];
+
+   		//condition1
+
+   		queues[0] = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[0].activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[0], false), false);
+   		assertTrue(added);
+
+   		queues[1] = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[1].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[1], false), false);
+   		assertTrue(added);
+
+   		queues[2] = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[2].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[2], false), false);
+   		assertTrue(added);
+
+   		queues[3] = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[3].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[3], false), false);
+   		assertTrue(added);
+
+   		//durable
+
+   		queues[4] = new MessagingQueue(2, "sub5", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[4].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queues[4], false), false);
+   		assertTrue(added);
+
+   		queues[5] = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[5].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[5], false), false);
+   		assertTrue(added);
+
+   		//durable
+
+   		queues[6] = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[6].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[6], false), false);
+   		assertTrue(added);
+
+   		//durable
+
+   		queues[7] = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[7].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queues[7], false), false);
+   		assertTrue(added);
+
+   		//condition2
+
+
+   		queues[8] = new MessagingQueue(1, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[8].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[8], false), false);
+   		assertTrue(added);
+
+   		queues[9] = new MessagingQueue(1, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[9].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[9], false), false);
+   		assertTrue(added);
+
+   		queues[10] = new MessagingQueue(2, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[10].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[10], false), false);
+   		assertTrue(added);
+
+   		queues[11] = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[11].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[11], false), false);
+   		assertTrue(added);
+
+   		//durable
+
+   		queues[12] = new MessagingQueue(2, "sub13", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[12].activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queues[12], false), false);
+   		assertTrue(added);
+
+   		queues[13] = new MessagingQueue(1, "sub14", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queues[13].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[13], false), false);
+   		assertTrue(added);
+
+   		//durable
+
+   		queues[14] = new MessagingQueue(1, "sub15", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[14].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[14], false), false);
+   		assertTrue(added);
+
+   		queues[15] = new MessagingQueue(1, "sub16", channelIDManager.getID(), ms, pm, true, -1, null, true);
+   		queues[15].activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queues[15], false), false);
+   		assertTrue(added);
+
+   		SimpleReceiver[] receivers = new SimpleReceiver[16];
+
+   		for (int i = 0; i < 16; i++)
+   		{
+   			receivers[i] = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   			queues[i].getLocalDistributor().add(receivers[i]);
+   		}
+
+   		Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+   		MessageReference ref = ms.reference(msg);         
+
+   		boolean routed = office1.route(ref, new SimpleCondition("condition1"), null);         
+   		assertTrue(routed);
+
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		//Durable queues on remote node should never get the message
+
+   		for (int i = 0; i < 16; i++)
+   		{
+   			if (i >= 8 || (queues[i].getNodeID() == 2 && queues[i].isRecoverable()))
+   			{
+   				this.checkNotGetsMessage(queues[i], receivers[i]);
+   			}
+   			else
+   			{
+   				//Should get the message
+   				this.checkGetsMessage(queues[i], receivers[i], msg);
+   			}
+
+   		}
+
+   		//Now route to condition2
+
+   		msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);;      
+   		ref = ms.reference(msg);         
+
+   		routed = office2.route(ref, new SimpleCondition("condition2"), null);         
+   		assertTrue(routed);
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		for (int i = 0; i < 16; i++)
+   		{
+   			if (i < 8 || (queues[i].getNodeID() == 1 && queues[i].isRecoverable()))
+   			{
+   				//Shouldn't get the message
+   				this.checkNotGetsMessage(queues[i], receivers[i]);
+   			}
+   			else
+   			{
+   				//Should get the message
+   				this.checkGetsMessage(queues[i], receivers[i], msg);
+   			}
+
+   		}
+
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			try
+   			{              
+   				office1.removeBinding("sub7", false);
+   				office1.removeBinding("sub8", false);               
+   				office1.removeBinding("sub15", false);
+   				office1.removeBinding("sub16", false);
+   			}
+   			catch (Exception ignore)
+   			{
+   				ignore.printStackTrace();
+   			}
+
+   			office1.stop();
+   		}
+
+   		if (office2 != null)
+   		{
+   			try
+   			{
+   				office2.removeBinding("sub5", false);
+   				office2.removeBinding("sub13", false);
+   			}
+   			catch (Exception ignore)
+   			{     
+   				ignore.printStackTrace();
+   			}
+   			office2.stop();
+   		}
+
+   	}
+   }
+
+
+
+   /*
+    * Queues with same name on different nodes of the cluster.
+    * If queue is routed to locally it shouldn't be routed to on other nodes
+    * 
+    */
+   private void routeSharedQueue(boolean persistentMessage) throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	PostOffice office2 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+   		office2 = createClusteredPostOffice(2, "testgroup");
+
+   		//queue1
+
+   		Queue queue0 = new MessagingQueue(1, "myqueue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue0.activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("myqueue1"), queue0, false), false);
+   		assertTrue(added);
+
+   		Queue queue1 = new MessagingQueue(2, "myqueue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue1.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("myqueue1"), queue1, false), false);
+   		assertTrue(added);
+
+
+   		//queue2
+
+   		Queue queue2 = new MessagingQueue(1, "myqueue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue2.activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("myqueue2"), queue2, false), false);
+   		assertTrue(added);
+
+   		Queue queue3 = new MessagingQueue(2, "myqueue2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue3.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("myqueue2"), queue3, false), false);
+   		assertTrue(added);
+
+
+   		SimpleReceiver receiver0 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue0.getLocalDistributor().add(receiver0);
+
+   		SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue1.getLocalDistributor().add(receiver1);
+
+   		SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue2.getLocalDistributor().add(receiver2);
+
+   		SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue3.getLocalDistributor().add(receiver3);
+
+   		//Route to myqueue1 from office1         
+
+   		Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+   		MessageReference ref = ms.reference(msg);         
+
+   		boolean routed = office1.route(ref, new SimpleCondition("myqueue1"), null);         
+   		assertTrue(routed);
+
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		//Only queue0 should get the message
+   		checkGetsMessage(queue0, receiver0, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+   		//	Route to myqueue1 from office 2      
+
+   		msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);      
+   		ref = ms.reference(msg);         
+
+   		routed = office2.route(ref, new SimpleCondition("myqueue1"), null);         
+   		assertTrue(routed);
+
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		//Only queue1 should get the message
+   		checkGetsMessage(queue1, receiver1, msg);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+
+   		//Now route to condition2 from office 1
+
+   		msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);;      
+   		ref = ms.reference(msg);         
+
+   		routed = office1.route(ref, new SimpleCondition("myqueue2"), null);         
+   		assertTrue(routed);
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		// Only queue2 should get the message
+   		checkGetsMessage(queue2, receiver2, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+
+   		//Now route to condition2 from office 2
+
+   		msg = CoreMessageFactory.createCoreMessage(4, persistentMessage, null);;      
+   		ref = ms.reference(msg);         
+
+   		routed = office2.route(ref, new SimpleCondition("myqueue2"), null);         
+   		assertTrue(routed);
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		// Only queue3 should get the message
+   		checkGetsMessage(queue3, receiver3, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+
+
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			office1.stop();
+   		}
+
+   		if (office2 != null)
+   		{
+
+   			office2.stop();
+   		}
+
+   	}
+   }
+
+
+   private void routeWithFilter(boolean persistentMessage) throws Throwable
+   {
+   	PostOffice office1 = null;
+
+   	PostOffice office2 = null;
+
+   	try
+   	{   
+   		office1 = createClusteredPostOffice(1, "testgroup");
+   		office2 = createClusteredPostOffice(2, "testgroup");
+
+
+   		Queue queue0 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue0.activate();
+   		boolean added = office1.addBinding(new Binding(new SimpleCondition("condition1"), queue0, false), false);
+   		assertTrue(added);
+
+   		Queue queue1 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue1.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
+   		assertTrue(added);
+
+
+   		Queue queue2 = new MessagingQueue(1, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue2.activate();
+   		added = office1.addBinding(new Binding(new SimpleCondition("condition2"), queue2, false), false);
+   		assertTrue(added);
+
+   		Queue queue3 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
+   		queue3.activate();
+   		added = office2.addBinding(new Binding(new SimpleCondition("condition2"), queue3, false), false);
+   		assertTrue(added);
+
+
+   		SimpleReceiver receiver0 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue0.getLocalDistributor().add(receiver0);
+
+   		SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue1.getLocalDistributor().add(receiver1);
+
+   		SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue2.getLocalDistributor().add(receiver2);
+
+   		SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+   		queue3.getLocalDistributor().add(receiver3);
+
+   		//Route to condition1 from office1         
+
+   		Message msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);      
+   		MessageReference ref = ms.reference(msg);         
+
+   		boolean routed = office1.route(ref, new SimpleCondition("myqueue1"), null);         
+   		assertTrue(routed);
+
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		//Only queue0 should get the message
+   		checkGetsMessage(queue0, receiver0, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+   		//	Route to myqueue1 from office 2      
+
+   		msg = CoreMessageFactory.createCoreMessage(2, persistentMessage, null);      
+   		ref = ms.reference(msg);         
+
+   		routed = office2.route(ref, new SimpleCondition("myqueue1"), null);         
+   		assertTrue(routed);
+
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		//Only queue1 should get the message
+   		checkGetsMessage(queue1, receiver1, msg);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+
+   		//Now route to condition2 from office 1
+
+   		msg = CoreMessageFactory.createCoreMessage(3, persistentMessage, null);;      
+   		ref = ms.reference(msg);         
+
+   		routed = office1.route(ref, new SimpleCondition("myqueue2"), null);         
+   		assertTrue(routed);
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		// Only queue2 should get the message
+   		checkGetsMessage(queue2, receiver2, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue3, receiver3);
+
+
+   		//Now route to condition2 from office 2
+
+   		msg = CoreMessageFactory.createCoreMessage(4, persistentMessage, null);;      
+   		ref = ms.reference(msg);         
+
+   		routed = office2.route(ref, new SimpleCondition("myqueue2"), null);         
+   		assertTrue(routed);
+   		//Messages are sent asych so may take some finite time to arrive
+   		Thread.sleep(1000);
+
+   		// Only queue3 should get the message
+   		checkGetsMessage(queue3, receiver3, msg);
+
+   		checkNotGetsMessage(queue1, receiver1);
+
+   		checkNotGetsMessage(queue0, receiver0);
+
+   		checkNotGetsMessage(queue2, receiver2);
+
+
+
+   		if (checkNoMessageData())
+   		{
+   			fail("Message data still in database");
+   		}
+   	}
+   	finally
+   	{
+   		if (office1 != null)
+   		{
+   			office1.stop();
+   		}
+
+   		if (office2 != null)
+   		{
+
+   			office2.stop();
+   		}
+
+   	}
+   }
+
+   private void dumpNodeIDView(PostOffice postOffice)
+   {
+   	Set view = postOffice.nodeIDView();
+
+   	log.info("=== node id view ==");
+
+   	Iterator iter = view.iterator();
+
+   	while (iter.hasNext())
+   	{
+   		log.info("Node:" + iter.next());
+   	}
+
+   	log.info("==================");
+   }
+
+   private void assertGotAll(int nodeId, Collection bindings, String queueName)
+   {
+
+   	log.info("============= dumping bindings ========");
+
+   	Iterator iter = bindings.iterator();
+
+   	while (iter.hasNext())
+   	{
+   		Binding binding = (Binding)iter.next();
+
+   		log.info("Binding: " + binding);
+   	}
+
+   	log.info("========= end dump==========");
+
+   	assertEquals(3, bindings.size());
+
+   	iter = bindings.iterator();
+
+   	boolean got1 = false;
+   	boolean got2 = false;
+   	boolean got3 = false;         
+   	while (iter.hasNext())
+   	{
+   		Binding binding = (Binding)iter.next();
+
+   		log.info("binding node id " + binding.queue.getNodeID());
+
+   		assertEquals(queueName, binding.queue.getName());
+   		if (binding.queue.getNodeID() == nodeId)
+   		{
+   			assertTrue(binding.allNodes);
+   		}
+   		else
+   		{
+   			assertFalse(binding.allNodes);
+   		}
+
+   		if (binding.queue.getNodeID() == 1)
+   		{
+   			got1 = true;
+   		}
+   		if (binding.queue.getNodeID() == 2)
+   		{
+   			got2 = true;
+   		}
+   		if (binding.queue.getNodeID() == 3)
+   		{
+   			got3 = true;
+   		}    	
+   	}         
+   	assertTrue(got1 && got2 && got3);
+   }
+
+
+   private void checkGetsMessage(Queue queue, SimpleReceiver receiver, Message msg) throws Throwable
+   {
+   	List msgs = receiver.getMessages();
+   	assertNotNull(msgs);
+   	assertEquals(1, msgs.size());
+   	Message msgRec = (Message)msgs.get(0);
+   	assertEquals(msg.getMessageID(), msgRec.getMessageID());
+   	receiver.acknowledge(msgRec, null);
+   	msgs = queue.browse(null);
+   	assertNotNull(msgs);
+   	assertTrue(msgs.isEmpty()); 
+   	receiver.clear();
+   }
+
+   private void checkNotGetsMessage(Queue queue, SimpleReceiver receiver) throws Throwable
+   {
+   	List msgs = receiver.getMessages();
+   	assertNotNull(msgs);
+   	assertTrue(msgs.isEmpty());
+   	msgs = queue.browse(null);
+   	assertNotNull(msgs);
+   	assertTrue(msgs.isEmpty());
+   }
+   
    // Inner classes --------------------------------------------------------------------------------
 
 }




More information about the jboss-cvs-commits mailing list