[Jboss-cvs] JBoss Messaging SVN: r1360 - in trunk: src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Sep 23 10:21:25 EDT 2006


Author: timfox
Date: 2006-09-23 10:21:10 -0400 (Sat, 23 Sep 2006)
New Revision: 1360

Modified:
   trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   trunk/src/main/org/jboss/jms/server/destination/QueueService.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/Delivery.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
More clustering



Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -17,6 +17,7 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.system.ServiceMBeanSupport;
 import org.w3c.dom.Element;
 
@@ -60,6 +61,8 @@
    
    protected MessageStore ms;
    
+   protected TransactionRepository tr;
+   
    protected IdManager idm;
    
    protected String nodeId;
@@ -97,6 +100,8 @@
          
          ms = serverPeer.getMessageStore();
          
+         tr = serverPeer.getTxRepository();
+         
          idm = serverPeer.getChannelIdManager();
          
          nodeId = serverPeer.getServerPeerID();

Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -117,7 +117,7 @@
             else
             {               
                queue = new LocalClusteredQueue(postOffice, nodeId, destination.getName(), idm.getId(), ms, pm, true, true,                        
-                                               executor, null,
+                                               executor, null, tr, 
                                                destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
                
                ((ClusteredPostOffice)postOffice).bindClusteredQueue(destination.getName(), (LocalClusteredQueue)queue);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -63,6 +63,7 @@
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.util.id.GUID;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -102,6 +103,7 @@
    private DestinationManager dm;
    private IdManager idm;
    private QueuedExecutorPool pool;
+   private TransactionRepository tr;
    private PostOffice topicPostOffice;
    private PostOffice queuePostOffice;
    private String nodeId;
@@ -126,6 +128,7 @@
       idm = sp.getChannelIdManager();
       pool = sp.getQueuedExecutorPool();
       nodeId = sp.getServerPeerID();
+      tr = sp.getTxRepository();
 
       consumers = new HashMap();
 		browsers = new HashMap();  
@@ -244,7 +247,7 @@
                   else
                   {
                      q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,                              
-                                                 executor, selector,
+                                                 executor, selector, tr,
                                                  mDest.getFullSize(),
                                                  mDest.getPageSize(),
                                                  mDest.getDownCacheSize());
@@ -302,7 +305,7 @@
                      else
                      {
                         q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,                              
-                                                    executor, selector,
+                                                    executor, selector, tr,
                                                     mDest.getFullSize(),
                                                     mDest.getPageSize(),
                                                     mDest.getDownCacheSize());

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -172,7 +172,7 @@
       }
       else
       {
-         return handleInternal(sender, ref, tx, true);
+         return handleInternal(sender, ref, tx, true, false);
       }
    }  
       
@@ -182,26 +182,10 @@
    {
       if (trace) { log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx)); }
 
-      if (tx == null)
-      {
-         // acknowledge non transactionally
-
-         // TODO We should consider also executing acks on the event queue
-         acknowledgeInternal(d, true);
-      }
-      else
-      {
-         this.getCallback(tx).addDelivery(d);
-
-         if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
-
-         if (recoverable && d.getReference().isReliable())
-         {
-            pm.removeReference(channelID, d.getReference(), tx);
-         }
-      }
+      this.acknowledgeInternal(d, tx, true, false);
    }
-
+   
+  
    public void cancel(Delivery d) throws Throwable
    {
       // TODO We should also consider executing cancels on the event queue
@@ -325,6 +309,9 @@
          {
             future = new Future();
          }
+         //TODO we should keep track of how many deliveries are currently in the queue
+         //so we don't execute another delivery when one is in the queue, since
+         //this is pointless
                   
          this.executor.execute(new DeliveryRunnable(future));
          
@@ -583,7 +570,7 @@
 
                   Delivery delivery = new SimpleDelivery(this, ref, true);
 
-                  acknowledgeInternal(delivery, true);
+                  acknowledgeInternal(delivery, null, true, false);
                }
                else
                {
@@ -694,7 +681,7 @@
    }
 
    protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
-                                     Transaction tx, boolean persist)
+                                     Transaction tx, boolean persist, boolean synchronous)
    {
       if (ref == null)
       {
@@ -771,7 +758,8 @@
             else
             {
                // add to post commit callback
-               this.getCallback(tx).addRef(ref);
+               getCallback(tx, synchronous).addRef(ref);
+               
                if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
             }
 
@@ -798,19 +786,33 @@
       return new SimpleDelivery(this, ref, true);
    }
 
-   protected void acknowledgeInternal(Delivery d, boolean persist) throws Exception
-   {      
-      synchronized (deliveryLock)
+   protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist, boolean synchronous) throws Exception
+   {   
+      if (tx == null)
       {
-         acknowledgeInMemory(d);
+         synchronized (deliveryLock)
+         {
+            acknowledgeInMemory(d);
+         }
+            
+         if (persist && recoverable && d.getReference().isReliable())
+         {
+            pm.removeReference(channelID, d.getReference(), null);
+         }
+              
+         d.getReference().releaseMemoryReference();             
       }
-         
-      if (persist && recoverable && d.getReference().isReliable())
+      else
       {
-         pm.removeReference(channelID, d.getReference(), null);
+         this.getCallback(tx, synchronous).addDelivery(d);
+   
+         if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
+   
+         if (recoverable && d.getReference().isReliable())
+         {
+            pm.removeReference(channelID, d.getReference(), tx);
+         }
       }
-           
-      d.getReference().releaseMemoryReference();        
    }
 
    protected boolean acknowledgeInMemory(Delivery d)
@@ -830,16 +832,24 @@
       return removed;
    }     
 
-   protected InMemoryCallback getCallback(Transaction tx)
+   protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
    {
-      InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);
+      InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);            
 
       if (callback == null)
       {
-         callback = new InMemoryCallback();
+         callback = new InMemoryCallback(synchronous);
 
          tx.addCallback(callback, this);
       }
+      else
+      {
+         //Sanity
+         if (callback.isSynchronous() != synchronous)
+         {
+            throw new IllegalStateException("Callback synchronousness status doesn't match");
+         }
+      }
 
       return callback;
    }
@@ -859,13 +869,26 @@
       private List refsToAdd;
 
       private List deliveriesToRemove;
+      
+      private boolean synchronous;
+      
+      private boolean committing;
 
-      private InMemoryCallback()
+      private Future result;
+
+      private InMemoryCallback(boolean synchronous)
       {
          refsToAdd = new ArrayList();
 
          deliveriesToRemove = new ArrayList();
+         
+         this.synchronous = synchronous;
       }
+      
+      private boolean isSynchronous()
+      {
+         return synchronous;
+      }
 
       private void addRef(MessageReference ref)
       {
@@ -896,11 +919,7 @@
       {
          // NOOP
       }
-
-      private boolean committing;
-
-      private Future result;
-
+      
       public void run()
       {
          try
@@ -913,13 +932,7 @@
             {
                doAfterRollback();
             }
-
-            // prompt delivery
-            if (receiversReady)
-            {
-               deliverInternal(true);
-            }
-
+            
             result.setResult(null);
          }
          catch (Throwable t)
@@ -928,24 +941,46 @@
          }
       }
 
-      public void afterCommit(boolean onePhase) throws TransactionException
+      public void afterCommit(boolean onePhase) throws Exception
       {
-         // We don't execute the commit directly, we add it to the event queue
-         // of the channel
-         // so it is executed in turn
-         committing = true;
-
-         executeAndWaitForResult();
+         if (synchronous)
+         {
+            try
+            {
+               doAfterCommit();
+            }
+            catch (Throwable t)
+            {
+               //TODO Sort out exception handling!!
+               throw new TransactionException("Failed to commit", t);
+            }
+         }
+         else
+         {            
+            // We don't execute the commit directly, we add it to the event queue
+            // of the channel
+            // so it is executed in turn
+            committing = true;
+   
+            executeAndWaitForResult();
+         }
       }
 
-      public void afterRollback(boolean onePhase) throws TransactionException
+      public void afterRollback(boolean onePhase) throws Exception
       {
-         // We don't execute the commit directly, we add it to the event queue
-         // of the channel
-         // so it is executed in turn
-         committing = false;
-
-         executeAndWaitForResult();
+         if (synchronous)            
+         {
+            doAfterRollback();
+         }
+         else
+         {                     
+            // We don't execute the commit directly, we add it to the event queue
+            // of the channel
+            // so it is executed in turn
+            committing = false;
+   
+            executeAndWaitForResult();
+         }
       }
 
       public String toString()
@@ -989,7 +1024,7 @@
          }
       }
 
-      private void doAfterCommit() throws TransactionException
+      private void doAfterCommit() throws Throwable
       {
          // We add the references to the state
 
@@ -1035,6 +1070,12 @@
                throw new TransactionException("Failed to ack message", t);
             }
          }
+         
+         //prompt delivery
+         if (receiversReady)
+         {
+            deliverInternal(true);
+         }
       }
 
       private void doAfterRollback()
@@ -1086,21 +1127,27 @@
       
       public void run()
       {
-         try
+         try         
          {
-            receiversReady = true;
-            
-            deliverInternal(false);                  
-            
-            if (result != null)
-            {
-               result.setResult(null);
+            if (router.numberOfReceivers() > 0)
+            {               
+               receiversReady = true;
+               
+               deliverInternal(false);                  
+               
+               if (result != null)
+               {
+                  result.setResult(null);
+               }
             }
          }
          catch (Throwable t)
          {
             log.error("Failed to deliver", t);
-            result.setException(t);
+            if (result != null)
+            {
+               result.setException(t);
+            }
          }
       }
    }   
@@ -1125,7 +1172,7 @@
 
       public void run()
       {
-         Delivery d = handleInternal(sender, ref, null, persist);
+         Delivery d = handleInternal(sender, ref, null, persist, false);
          result.setResult(d);
       }
    }   

Modified: trunk/src/main/org/jboss/messaging/core/Delivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Delivery.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/Delivery.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -52,5 +52,5 @@
    void acknowledge(Transaction tx) throws Throwable;
 
    void cancel() throws Throwable;
-
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -247,10 +247,6 @@
             firstPagingOrder = nextPagingOrder = 0;
          }
          
-         log.info("Loading channel");
-         
-         log.info("Got " + ili.getRefInfos().size() + " intial refs");
-         
          Map refMap = processReferences(ili.getRefInfos()); 
         
          Iterator iter = ili.getRefInfos().iterator();
@@ -436,7 +432,6 @@
          if (messageRefs.size() <= fullSize - numberLoadable)
          {
             //This will flush the down cache too
-            log.info("Loading " + numberLoadable + " refs");
             loadPagedReferences(numberLoadable);
             
             return true;

Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -48,7 +48,7 @@
    protected boolean selectorAccepted;
    protected DeliveryObserver observer;
    protected MessageReference reference;
-   
+
    private boolean trace = log.isTraceEnabled();
 
    // Constructors --------------------------------------------------
@@ -89,8 +89,8 @@
       this.observer = observer;
       this.selectorAccepted = selectorAccepted;
    }
+   
 
-
    // Delivery implementation ---------------------------------
 
    public MessageReference getReference()
@@ -148,7 +148,7 @@
       observer.cancel(this);
       cancelled = true;      
    }
-
+   
    // Public --------------------------------------------------------
 
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -293,8 +293,6 @@
          
          ps = conn.prepareStatement(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
                  
-         log.info(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"));
-         
          ps.setLong(1, orderStart);
          
          ps.setLong(2, orderEnd);
@@ -1181,8 +1179,6 @@
          
          Long minOrdering = new Long(rs.getLong(1));
          
-         log.info("min ordering is: " + minOrdering);
-         
          if (rs.wasNull())
          {
             minOrdering = null;
@@ -1195,8 +1191,6 @@
             maxOrdering = null;
          }
          
-         log.info("Min ordering: " + minOrdering + " max Ordering: " + maxOrdering);
-
          ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
          
          ps.setLong(1, channelID);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -43,7 +43,6 @@
 import org.jboss.messaging.core.FilterFactory;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
@@ -406,7 +405,7 @@
                
                   ClusteredQueue queue = (ClusteredQueue)del.getObserver();
                   
-                  log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
+              //    log.info("Routing message to queue:" + queue.getName() + " on node " + queue.getNodeId());
                   
                   if (router.numberOfReceivers() > 1)
                   {
@@ -446,7 +445,7 @@
                {
                   if (numberRemote == 1)
                   {
-                     log.info("unicast no tx");
+                  //   log.info("unicast no tx");
                      //Unicast - only one node is interested in the message
                      
                      //FIXME - temporarily commented out until can get unicast to work
@@ -455,7 +454,7 @@
                   }
                   else
                   {
-                     log.info("multicast no tx");
+                  //   log.info("multicast no tx");
                      //Multicast - more than one node is interested
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
                   }                                 
@@ -587,56 +586,11 @@
       }
    }
    
-   public void addToQueue(String queueName, List messages) throws Exception
-   {
-      lock.readLock().acquire();      
-            
-      try
-      {
-         Binding binding = this.getBindingForQueueName(queueName);
-         
-         if (binding == null)
-         {
-            throw new IllegalStateException("Cannot find binding for queue name " + queueName);
-         }
-         
-         LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
-         
-         Iterator iter = messages.iterator();
-         
-         while (iter.hasNext())
-         {
-            MessageReference ref = null;
-            
-            try
-            {
-               org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-               
-               ref = ms.reference(msg);
-               
-               queue.handleFromCluster(ref);
-            }
-            finally
-            {
-               if (ref != null)
-               {
-                  ref.releaseMemoryReference();
-               }
-            }
-         }    
-      }
-      finally
-      {
-         
-         lock.readLock().release();
-      }
-   }
-   
    public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
                                 Map queueNameNodeIdMap) throws Exception
    {
-      log.info(this.nodeId + " received route from cluster, ref = " + message.getMessageID() + " routing key " +
-               routingKey + " map " + queueNameNodeIdMap);
+    //  log.info(this.nodeId + " received route from cluster, ref = " + message.getMessageID() + " routing key " +
+    //           routingKey + " map " + queueNameNodeIdMap);
       
       lock.readLock().acquire();  
       
@@ -649,12 +603,8 @@
          // We route on the condition
          DefaultClusteredBindings cb = (DefaultClusteredBindings)conditionMap.get(routingKey);
          
-        // log.info("cb is " + cb);
-         
          if (cb != null)
          {                      
-          //  log.info("cb size is " + cb.getAllBindings().size());
-            
             Collection bindings = cb.getAllBindings();
             
             Iterator iter = bindings.iterator();
@@ -667,7 +617,6 @@
                {  
                   boolean handle = true;
                   
-                  //log.info("Queue map is: " + queueNameNodeIdMap);
                   if (queueNameNodeIdMap != null)
                   {                     
                      String desiredNodeId = (String)queueNameNodeIdMap.get(binding.getQueue().getName());
@@ -687,22 +636,9 @@
                      
                      LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
                      
-                     log.info(queue.getName() + " is handling it on node " + queue.getNodeId());
-                     
                      Delivery del = queue.handleFromCluster(ref);         
-                     
-                     //log.info("Handled it: " + del);
-                     //log.info("accepted: " +del.isSelectorAccepted());
                   }
-                  else
-                  {
-                     log.info(this.nodeId + " not handling it");
-                  }
                }
-               else
-               {
-                  //log.info("wrong node");
-               }
             }                          
          }
       }
@@ -816,9 +752,9 @@
       }
    }
    
-   public void sendStats() throws Exception
+   public void sendQueueStats() throws Exception
    {
-      lock.writeLock().acquire();
+      lock.readLock().acquire();
       
       List statsList = null;      
       
@@ -839,7 +775,9 @@
                if (q.isActive())
                {                                                      
                   QueueStats stats = q.getStats();
-                                     
+                  
+                  //log.info(q.getNodeId() + " queue " + stats.getQueueName() + " count " + stats.getMessageCount());
+                                                       
                   //We don't bother sending the stats if there's no significant change in the values
                   
                   if (q.changedSignificantly())
@@ -851,13 +789,17 @@
 
                      statsList.add(stats);
                   } 
+                  else
+                  {
+                     //log.info("Not changed significantly");
+                  }
                }
             }
          }
       }
       finally
       {
-         lock.writeLock().release();
+         lock.readLock().release();
       }
       
       if (statsList != null)
@@ -870,7 +812,7 @@
    
    public void updateQueueStats(String nodeId, List statsList) throws Exception
    {
-      lock.writeLock().acquire();
+      lock.readLock().acquire();
       
       try
       {      
@@ -916,7 +858,7 @@
                   
                   if (toQueue != null)
                   {
-                     localQueue.setPullQueue(toQueue);
+                     localQueue.setPullInfo(toQueue, pullSize);
                      
                      //We now trigger delivery - this may cause a pull event
                      localQueue.deliver(false);
@@ -927,7 +869,7 @@
       }
       finally
       {
-         lock.writeLock().release();      
+         lock.readLock().release();      
       }
    }      
    
@@ -951,12 +893,35 @@
       
       return dels;
    }
-         
-   public void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue) throws Throwable
+   
+   public Address getAddressForNodeId(String nodeId) throws Exception
    {
-      pullMessages(localQueue, remoteQueue, pullSize);
+      lock.readLock().acquire();
+      
+      try
+      {
+         return (Address)nodeIdAddressMap.get(nodeId);
+      }
+      finally
+      {
+         lock.readLock().release();      
+      }
    }
-              
+   
+   /*
+    * Unicast a sync request
+    */
+   public Object syncSendRequest(ClusterRequest request, Address address) throws Exception
+   {              
+      byte[] bytes = writeRequest(request);
+            
+      Message message = new Message(address, null, bytes);      
+      
+      Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+       
+      return result;
+   }
+               
    // Public ------------------------------------------------------------------------------------------
       
    // Protected ---------------------------------------------------------------------------------------
@@ -1070,7 +1035,7 @@
          QueuedExecutor executor = (QueuedExecutor)pool.get();
          
          queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
-                                         durable, executor, filter);
+                                         durable, executor, filter, tr);
       }
       else
       {
@@ -1086,105 +1051,7 @@
    
    // Private ------------------------------------------------------------------------------------------
            
-   /**
-    * TODO This can probably be moved into LocalClusteredQueue
-    * 
-    * Pull messages from a remote queue to a local queue.
-    * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
-    * Normally this would require 2PC which would make performance suck.
-    * However since we know both queues share the same DB then we can do the persistence locally in the same
-    * tx thus avoiding 2PC and maintaining reliability:)
-    * We do the following:
-    * 
-    * 1. A tx is started locally
-    * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
-    * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
-    * deliveries.
-    * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
-    * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
-    * The messages corresponding to the deliveries are returned to the local node
-    * 3. The retrieved messages are added to the local queue in the tx
-    * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
-    * 5. The local tx is committed.
-    * 6. Send "commit" message to remote node
-    * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
-    * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
-    * depending on whether they exist in the database
-    */
-   private void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue, int num) throws Throwable
-   { 
-      Address fromAddress = (Address)nodeIdAddressMap.get(remoteQueue.getNodeId());
-      
-      if (fromAddress == null)
-      {
-         //This is ok - the node might have left the group
-         return;
-      }
    
-      Transaction tx = tr.createTransaction();
-         
-      ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), remoteQueue.getChannelID(),
-                                                   localQueue.getName(), num);
-      
-      byte[] bytes = (byte[])syncSendRequest(req, fromAddress);
-      
-      PullMessagesResponse response = new PullMessagesResponse();
-      
-      StreamUtils.fromBytes(response, bytes);
-
-      List msgs = response.getMessages();
-      
-      log.info("I have " + msgs.size() + " messages");
-      
-      Iterator iter = msgs.iterator();
-      
-      while (iter.hasNext())
-      {
-         org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-         
-         MessageReference ref = null;
-         
-         try
-         {
-            ref = ms.reference(msg);
-            
-            Delivery delRet = localQueue.handle(null, ref, tx);
-            
-            if (delRet == null || !delRet.isSelectorAccepted())
-            {
-               //This should never happen
-               throw new IllegalStateException("Aaarrgg queue did not accept reference");
-            }
-         }
-         finally
-         {
-            if (ref != null)
-            {
-               ref.releaseMemoryReference();
-            }
-         }
-         
-         Delivery del = new SimpleDelivery(localQueue, ref);
-         
-         del.acknowledge(tx);
-      }
-      
-      tx.commit();
-      
-      //TODO what if commit throws an exception - this means the commit message doesn't hit the 
-      //remote node so the holding transaction stays in the holding area 
-      //Need to catch the exception and throw a check message
-      //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
-      //and send a checkrequest
-      //This applies to a normal message and messages requests too
-            
-      if (!msgs.isEmpty())
-      {         
-         req = new PullMessagesRequest(this.nodeId, tx.getId());
-         
-         asyncSendRequest(req, fromAddress);
-      }
-   }
    
    
    /*
@@ -1199,19 +1066,7 @@
       controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
    }
    
-   /*
-    * Unicast a sync request
-    */
-   private Object syncSendRequest(ClusterRequest request, Address address) throws Exception
-   {              
-      byte[] bytes = writeRequest(request);
-            
-      Message message = new Message(address, null, bytes);      
-      
-      Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-       
-      return result;
-   }
+
     
    private void removeBindingsForAddress(Address address) throws Exception
    {
@@ -1404,7 +1259,7 @@
       
       public void receive(Message message)
       {         
-         log.info("Received message on control channel: " + message);
+         //log.info("Received message on control channel: " + message);
       }
       
       public void setState(byte[] bytes)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -24,6 +24,8 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.jboss.logging.Logger;
+
 /**
  * A DefaultMessagePullPolicy
  * 
@@ -37,7 +39,9 @@
  */
 public class DefaultMessagePullPolicy implements MessagePullPolicy
 {
-
+   private static final Logger log = Logger.getLogger(DefaultMessagePullPolicy.class);
+   
+   
    public ClusteredQueue chooseQueue(List queues)
    {
       Iterator iter = queues.iterator();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -147,8 +147,6 @@
            
       if (localQueue != null)
       {
-         log.info("There is a local queue");
-         
          //The only time the local queue won't accept is if the selector doesn't
          //match - in which case it won't match at any other nodes too so no point
          //in trying them
@@ -159,7 +157,6 @@
       }
       else
       {
-         log.info("No local queue!");
          //There is no local shared queue
          
          //We round robin among the rest

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -23,9 +23,12 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.SimpleDelivery;
@@ -33,7 +36,11 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.StreamUtils;
+import org.jgroups.Address;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -49,6 +56,8 @@
  */
 public class LocalClusteredQueue extends PagingFilteredQueue implements ClusteredQueue
 {
+   private static final Logger log = Logger.getLogger(LocalClusteredQueue.class);
+      
    private PostOfficeInternal office;
    
    private volatile int lastCount;
@@ -58,36 +67,52 @@
    private RemoteQueueStub pullQueue;
    
    private String nodeId;
+   
+   //TODO Make configurable
+   private int pullSize;
+   
+   private TransactionRepository tr;
+   
+   private Object pullLock = new Object();
  
    //TODO - we shouldn't have to specify office AND nodeId
    public LocalClusteredQueue(PostOffice office, String nodeId, String name, long id, MessageStore ms, PersistenceManager pm,             
                               boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
-                              Filter filter,
+                              Filter filter, TransactionRepository tr,
                               int fullSize, int pageSize, int downCacheSize)
    {
       super(name, id, ms, pm, acceptReliableMessages, recoverable, executor, filter, fullSize, pageSize, downCacheSize);
      
       this.nodeId = nodeId;
       
+      this.tr = tr;
+      
       //FIXME - this cast is a hack
       this.office = (PostOfficeInternal)office;
    }
    
    public LocalClusteredQueue(PostOffice office, String nodeId, String name, long id, MessageStore ms, PersistenceManager pm,             
                               boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
-                              Filter filter)
+                              Filter filter, TransactionRepository tr)
    {
       super(name, id, ms, pm, acceptReliableMessages, recoverable, executor, filter);
       
       this.nodeId = nodeId;
       
+      this.tr = tr;
+      
       //FIXME - this cast is a hack
       this.office = (PostOfficeInternal)office;
    }
    
-   public void setPullQueue(RemoteQueueStub queue)
+   public void setPullInfo(RemoteQueueStub queue, int pullSize)
    {
-      this.pullQueue = queue;
+      synchronized (pullLock)
+      {
+         this.pullQueue = queue;
+         
+         this.pullSize = pullSize;
+      }
    }
    
    public QueueStats getStats()
@@ -100,6 +125,10 @@
          
          lastCount = cnt;
       }
+      else
+      {
+         changedSignificantly = false;
+      }
       
       return new QueueStats(name, cnt);
    }      
@@ -127,6 +156,8 @@
    {
       List dels = new ArrayList();
       
+      log.info("getting " + number + " deliveries, there are " + messageRefs.size() + " available");
+      
       synchronized (refLock)
       {
          synchronized (deliveryLock)
@@ -134,21 +165,26 @@
             //We only get the refs if receiversReady = false so as not to steal messages that
             //might be consumed by local receivers            
             if (!receiversReady)
-            {
+            {               
+               int count = 0;
+               
                MessageReference ref;
                
-               while ((ref = removeFirstInMemory()) != null)
+               while (count < number && (ref = removeFirstInMemory()) != null)
                {
                   SimpleDelivery del = new SimpleDelivery(this, ref);
                   
                   deliveries.add(del);
                   
-                  dels.add(del);               
+                  dels.add(del);       
+                  
+                  count++;
                }           
                return dels;
             }
             else
             {
+               log.info("Returning an empty list since receivers are ready");
                return Collections.EMPTY_LIST;
             }
          }
@@ -185,26 +221,33 @@
    
    public void acknowledgeFromCluster(Delivery d) throws Throwable
    {
-      acknowledgeInternal(d, false);      
+      acknowledgeInternal(d, null, false, false);      
    }
    
    protected void deliverInternal(boolean handle) throws Throwable
    {
-      super.deliverInternal(handle);
+      int beforeSize = -1;
       
       if (!handle)
       {
-         if (receiversReady)
+         beforeSize  = messageRefs.size();
+      }      
+      
+      super.deliverInternal(handle);
+
+      if (!handle)
+      {
+         int afterSize = messageRefs.size();
+         
+         if (receiversReady && beforeSize == 0 && afterSize == 0)
          {
             //Delivery has been prompted (not from handle call)
             //and has run, and there are consumers that are still interested in receiving more
             //refs but there are none available in the channel (either the channel is empty
             //or there are only refs that don't match any selectors)
             //then we should perhaps pull some messages from a remote queue
-            if (pullQueue != null)
-            {
-               office.pullMessages(this, pullQueue);                        
-            }
+            log.info("pulling messages");
+            pullMessages();
          }
       }
    }
@@ -213,4 +256,124 @@
    {
       return true;
    }
+   
+   /**
+    * Pull messages from a remote queue to this queue.
+    * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
+    * Normally this would require 2PC which would make performance suck.
+    * However since we know both queues share the same DB then we can do the persistence locally in the same
+    * tx thus avoiding 2PC and maintaining reliability:)
+    * We do the following:
+    * 
+    * 1. A tx is started locally
+    * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
+    * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
+    * deliveries.
+    * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
+    * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
+    * The messages corresponding to the deliveries are returned to the local node
+    * 3. The retrieved messages are added to the local queue in the tx
+    * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+    * 5. The local tx is committed.
+    * 6. Send "commit" message to remote node
+    * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
+    * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
+    * depending on whether they exist in the database
+    * 
+    * This method will always be executed on the channel's event queue (via the deliver method)
+    * so no need to do any handles or acks inside another event message
+    */
+   private void pullMessages() throws Throwable
+   {       
+      RemoteQueueStub theQueue;
+      int thePullSize;
+      
+      synchronized (pullLock)
+      {
+         if (pullQueue == null)
+         {
+            return;
+         }
+         theQueue = pullQueue;
+         thePullSize = pullSize;
+      }
+       
+      Address fromAddress = office.getAddressForNodeId(theQueue.getNodeId());
+      
+      if (fromAddress == null)
+      {
+         //This is ok - the node might have left the group
+         return;
+      }
+                  
+      Transaction tx = tr.createTransaction();
+         
+      ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
+                                                   name, thePullSize);
+      
+      log.info(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
+               " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
+      
+      byte[] bytes = (byte[])office.syncSendRequest(req, fromAddress);
+      
+      log.info( System.identityHashCode(this) +" Executed pull messages request");
+      
+      PullMessagesResponse response = new PullMessagesResponse();
+      
+      StreamUtils.fromBytes(response, bytes);
+
+      List msgs = response.getMessages();
+      
+      log.info(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages");
+      
+      Iterator iter = msgs.iterator();
+      
+      while (iter.hasNext())
+      {
+         org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+         
+         MessageReference ref = null;
+         
+         try
+         {
+            ref = ms.reference(msg);
+            
+            Delivery delRet = handleInternal(null, ref, tx, true, true);
+            
+            if (delRet == null || !delRet.isSelectorAccepted())
+            {
+               //This should never happen
+               throw new IllegalStateException("Aaarrgg queue did not accept reference");
+            }
+         }
+         finally
+         {
+            if (ref != null)
+            {
+               ref.releaseMemoryReference();
+            }
+         }
+         
+         Delivery del = new SimpleDelivery(this, ref);
+         
+         acknowledgeInternal(del, tx, true, true);
+      }
+      
+      tx.commit();
+      
+      //TODO what if commit throws an exception - this means the commit message doesn't hit the 
+      //remote node so the holding transaction stays in the holding area 
+      //Need to catch the exception and throw a check message
+      //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
+      //and send a checkrequest
+      //This applies to a normal message and messages requests too
+            
+      if (!msgs.isEmpty())
+      {         
+         req = new PullMessagesRequest(this.nodeId, tx.getId());
+         
+         office.asyncSendRequest(req, fromAddress);
+      }
+      
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -55,12 +55,14 @@
    
    void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
    
-   void addToQueue(String queueName, List messages) throws Exception;
+   //void addToQueue(String queueName, List messages) throws Exception;
    
    void asyncSendRequest(ClusterRequest request) throws Exception;
    
    void asyncSendRequest(ClusterRequest request, Address address) throws Exception;
    
+   Object syncSendRequest(ClusterRequest request, Address address) throws Exception;
+   
    void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
    
    void commitTransaction(TransactionId id) throws Throwable;
@@ -69,12 +71,11 @@
    
    void updateQueueStats(String nodeId, List stats) throws Exception;
    
-   void sendStats() throws Exception;
+   void sendQueueStats() throws Exception;
    
    boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
    
-   List getDeliveries(String queueName, int numMessages) throws Exception;
+   List getDeliveries(String queueName, int numMessages) throws Exception; 
    
-   void pullMessages(ClusteredQueue localQueue, ClusteredQueue remoteQueue) throws Throwable;
-
+   Address getAddressForNodeId(String nodeId) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -71,17 +71,13 @@
    }
 
    Object execute(PostOfficeInternal office) throws Throwable
-   {
-      log.info("********* executign pull messages requiest");
-      
+   { 
       TransactionId id = new TransactionId(nodeId, txId);
       
       if (hold)
       {         
          List dels = office.getDeliveries(queueName, numMessages);
          
-         log.info("Got a list of " + dels.size() + " deliveries");
-         
          PullMessagesResponse response = new PullMessagesResponse(dels.size());
          
          if (!dels.isEmpty())
@@ -114,9 +110,7 @@
             //Add this to the holding area
             office.holdTransaction(id, this);
          }
-         
-         log.info("returning response:" + response);
-         
+          
          //Convert to bytes since the response isn't serializable (nor do we want it to be)
          byte[] bytes = StreamUtils.toBytes(response);
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -115,9 +115,7 @@
          {
             //If the message is persistent and we are recoverable then we persist here, *before*
             //the message is sent across the network
-            
-            log.info("Adding ref: " + reference + " in channel " + id);
-            
+
             pm.addReference(id, reference, tx);
          }
          catch (Exception e)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -83,25 +83,20 @@
       timer.cancel();
       
       timer = null;
-   }
+   }      
    
-   
-   
    class SendStatsTimerTask extends TimerTask
    {
-
       public void run()
       {
          try
          {
-            office.sendStats();
+            office.sendQueueStats();
          }
          catch (Exception e)
          {
             log.error("Failed to send statistics", e);
          }
-      }
-      
+      }      
    }
-
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -250,13 +250,15 @@
       } 
    }
    
+   private static long msgCount;
+   
    protected List sendMessages(String condition, boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
    {
       List list = new ArrayList();
       
       for (int i = 0; i < num; i++)
       {         
-         Message msg = CoreMessageFactory.createCoreMessage(i + 1, persistent, null);      
+         Message msg = CoreMessageFactory.createCoreMessage(msgCount++, persistent, null);      
          
          MessageReference ref = ms.reference(msg);         
          

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -103,11 +103,11 @@
          
          //Add a couple of bindings
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
 
          Binding binding2 =
             office1.bindClusteredQueue("topic1", queue2);
@@ -126,7 +126,7 @@
          
          //Add another binding on node 2
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
 
          Binding binding3 =
             office2.bindClusteredQueue("topic1", queue3);
@@ -153,7 +153,7 @@
 
          //Add another binding on node 1
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 =
             office2.bindClusteredQueue("topic1", queue4);
          
@@ -217,7 +217,7 @@
          
          //Add another binding on node 3
                   
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, "node3", "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, "node3", "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding5 =
             office3.bindClusteredQueue("topic1", queue5);
@@ -253,12 +253,12 @@
          
          //Add a durable and a non durable binding on node 1
          
-         LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          
          Binding binding6 =
             office1.bindClusteredQueue("topic1", queue6);
          
-         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          
          Binding binding7 =
             office1.bindClusteredQueue("topic1", queue7);
@@ -501,15 +501,15 @@
          
          office2 = createClusteredPostOffice("node2", "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
                   
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          try
          {
@@ -520,7 +520,7 @@
          {
             //Ok
          }
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          try
          {
@@ -536,7 +536,7 @@
          
          office2.unbindClusteredQueue("queue1");
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
          
@@ -615,15 +615,15 @@
          SimpleFilter filter1 = new SimpleFilter(2);
          SimpleFilter filter2 = new SimpleFilter(3);
       
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1, tr);         
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2, tr);         
          Binding binding2 =
             office2.bindClusteredQueue("topic1", queue2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, "node2", "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 =
             office2.bindClusteredQueue("topic1", queue3);   
          
@@ -719,52 +719,52 @@
          LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
          
-         queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
          
-         queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
          
-         queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
          
-         queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
          
-         queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
          
-         queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
          
-         queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
          
-         queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
          
-         queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
          
-         queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
          
-         queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
          
-         queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
          
-         queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
          
-         queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
          
-         queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
        
          SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -899,27 +899,27 @@
          //We deploy the queue on nodes 1, 2, 3, 4 and 5
          //We don't deploy on node 6
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2); 
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 = office4.bindClusteredQueue("queue1", queue4); 
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
@@ -1081,13 +1081,13 @@
          //======
          
          //Non durable 1 on node 2
-         LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, "node2", "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable1.add(receiver1);
          
          //Non durable 2 on node 2
-         LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, "node2", "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable2.add(receiver2);
@@ -1096,13 +1096,13 @@
          //======
          
          //Non shared durable
-         LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, "node3", "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonSharedDurable1.add(receiver3);
          
          //Non durable
-         LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, "node3", "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable3.add(receiver4);
@@ -1111,31 +1111,31 @@
          //======
          
          //Shared durable
-         LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, "node4", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable1.add(receiver5);
          
          //Non shared durable
-         LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, "node4", "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
          SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonSharedDurable2.add(receiver6);
          
          //Non durable
-         LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, "node4", "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
          SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable4.add(receiver7);
          
          // Non durable
-         LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, "node4", "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
          SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable5.add(receiver8);
          
          //Non durable
-         LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, "node4", "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
          SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable6.add(receiver9);
@@ -1143,32 +1143,32 @@
          // Node 5
          //=======
          //Shared durable
-         LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, "node5", "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
          SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable2.add(receiver10);
          
          //Shared durable
-         LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, "node5", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
          SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable3.add(receiver11);
          
          // Node 6
          //=========
-         LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, "node6", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
          SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable4.add(receiver12);
          
-         LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, "node6", "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
          SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable7.add(receiver13);
          
          //Node 7
          //=======
-         LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, "node7", "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
          SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable5.add(receiver14);
@@ -1502,52 +1502,52 @@
          LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[0] = new LocalClusteredQueue(office1, "node1", "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
          
-         queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[1] = new LocalClusteredQueue(office1, "node1", "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
          
-         queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[2] = new LocalClusteredQueue(office2, "node2", "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
          
-         queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[3] = new LocalClusteredQueue(office2, "node2", "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
          
-         queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[4] = new LocalClusteredQueue(office2, "node2", "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
          
-         queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[5] = new LocalClusteredQueue(office1, "node1", "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
          
-         queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[6] = new LocalClusteredQueue(office1, "node1", "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
          
-         queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[7] = new LocalClusteredQueue(office1, "node1", "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
          
-         queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[8] = new LocalClusteredQueue(office1, "node1", "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
          
-         queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[9] = new LocalClusteredQueue(office1, "node1", "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
          
-         queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[10] = new LocalClusteredQueue(office2, "node2", "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
          
-         queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[11] = new LocalClusteredQueue(office2, "node2", "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
          
-         queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[12] = new LocalClusteredQueue(office2, "node2", "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
          
-         queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         queues[13] = new LocalClusteredQueue(office1, "node1", "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
          
-         queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[14] = new LocalClusteredQueue(office1, "node1", "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
          
-         queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         queues[15] = new LocalClusteredQueue(office1, "node1", "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
 
          SimpleReceiver[] receivers = new SimpleReceiver[16];

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -123,27 +123,27 @@
          
          office6 = createClusteredPostOffice("node6", "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office2.bindClusteredQueue("topic", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding2 = office3.bindClusteredQueue("topic", queue2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 = office4.bindClusteredQueue("topic", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding4 = office5.bindClusteredQueue("topic", queue4);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding5 = office6.bindClusteredQueue("topic", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
@@ -262,27 +262,27 @@
          
          office6 = createClusteredPostOffice("node6", "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office2.bindClusteredQueue("topic", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding2 = office3.bindClusteredQueue("topic", queue2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 = office4.bindClusteredQueue("topic", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding4 = office5.bindClusteredQueue("topic", queue4);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, "node6", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding5 = office6.bindClusteredQueue("topic", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -46,15 +46,15 @@
       }
 
       return
-      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + "):" +
-      "PING(timeout=2000;num_initial_members=3):"+
-      "FD(timeout=3000):"+
-      "VERIFY_SUSPECT(timeout=1500):"+
-      "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):"+
-      "UNICAST(timeout=600,1200,2400,4800):"+
-      "pbcast.STABLE(desired_avg_gossip=10000):"+
-      "FRAG:"+
-      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true)";
+      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+      "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+      "FD(timeout=3000;up_thread=false;down_thread=false):"+
+      "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+      "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+      "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+      "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+      "FRAG(up_thread=false;down_thread=false):"+
+      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
 
    }
    
@@ -73,15 +73,15 @@
       }
 
       return
-         "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + "):" +
-         "PING(timeout=2000;num_initial_members=3):"+
-         "FD(timeout=3000):"+
-         "VERIFY_SUSPECT(timeout=1500):"+
-         "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):"+
-         "pbcast.STABLE(desired_avg_gossip=10000):"+
-         "FRAG:"+
-         "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true):" +
-         "pbcast.STATE_TRANSFER";     
+         "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+         "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+         "FD(timeout=3000;up_thread=false;down_thread=false):"+
+         "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+         "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+         "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+         "FRAG(up_thread=false;down_thread=false):"+
+         "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false):" +
+         "pbcast.STATE_TRANSFER(up_thread=false;down_thread=false)";     
    }
 
    // Attributes ----------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-23 01:37:20 UTC (rev 1359)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-23 14:21:10 UTC (rev 1360)
@@ -105,46 +105,112 @@
          
          office5 = createClusteredPostOffice("node5", "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         log.info("Started offices");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+                  
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+                  
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+                  
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+                  
+         log.info("bound queues");
+         
+         //Send 30 messages to each queue
+         this.sendMessages("queue1", persistent, office1, 30, null);
+         this.sendMessages("queue1", persistent, office2, 30, null);
+         this.sendMessages("queue1", persistent, office3, 30, null);
+         this.sendMessages("queue1", persistent, office4, 30, null);
+         this.sendMessages("queue1", persistent, office5, 30, null);
+                 
+         log.info("sent messages");
+         
+         Thread.sleep(1000);
+         
+         //Check the sizes
+          
+         List msgs = queue1.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue2.browse();
+         assertEquals(30, msgs.size());
+           
+         msgs = queue3.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue4.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue5.browse();
+         assertEquals(30, msgs.size());
+         
+         //Now we add the receivers
+         //Note that we did not do this before the send.
+         //If we had done so then it's likely that the automatic redistribution
+         //would have moved some around and there wouldn't be 30 in each queue
+         
          PullingReceiver receiver1 = new PullingReceiver();
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
-         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
          PullingReceiver receiver2 = new PullingReceiver();
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
-         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
          PullingReceiver receiver3 = new PullingReceiver();
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
-         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
          PullingReceiver receiver4 = new PullingReceiver();
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
-         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
          PullingReceiver receiver5 = new PullingReceiver();
          queue5.add(receiver5);
+                 
+         log.info("Added receivers");
          
-         //Send 30 messages to each queue
-         this.sendMessages("queue1", persistent, office1, 30, null);
-         this.sendMessages("queue1", persistent, office2, 30, null);
-         this.sendMessages("queue1", persistent, office3, 30, null);
-         this.sendMessages("queue1", persistent, office4, 30, null);
-         this.sendMessages("queue1", persistent, office5, 30, null);
+         //Prompt delivery so a message pops into each receiver
+         queue1.deliver(true);
+         queue2.deliver(true);
+         queue3.deliver(true);
+         queue4.deliver(true);
+         queue5.deliver(true);
          
-         Thread.sleep(500);
+         Thread.sleep(1000);
          
-         List msgs = queue1.browse();
+         //Now we check the sizes again in case automatic balancing has erroneously
+         //kicked in
+         
+         msgs = queue1.browse();
          assertEquals(30, msgs.size());
          
          msgs = queue2.browse();
          assertEquals(30, msgs.size());
+           
+         msgs = queue3.browse();
+         assertEquals(30, msgs.size());
          
+         msgs = queue4.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue5.browse();
+         assertEquals(30, msgs.size());
+         
+         Thread.sleep(5000);
+         
+         //And again - should still be no redistribution
+         
+         msgs = queue1.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue2.browse();
+         assertEquals(30, msgs.size());
+           
          msgs = queue3.browse();
          assertEquals(30, msgs.size());
          
@@ -154,24 +220,103 @@
          msgs = queue5.browse();
          assertEquals(30, msgs.size());
          
-         //Consume all the messages from queue 3
-         for (int i = 0; i < 30; i++)
-         {
+         //Try and consumer them all via one receiver
+         
+         log.info("trying to consume");
+         
+         //So we have 150 messages in total - 30 on each node.
+         
+         //If redistribution works ok, we should be able to do something like the following:
+         
+         //Consume 10 on node 1
+         
+         //Consume 50 on node 2
+         
+         //Consume 75 on node 3
+         
+         //Consume 10 on node 4
+         
+         //Consume 5 on node 5
+                  
+         log.info("consuming queue1");
+         for (int i = 0; i < 10; i++)
+         {       
+            queue1.deliver(true);
+            Delivery del = receiver1.getDelivery();
+            log.info("Got delivery: " + del.getReference().getMessageID());
+            del.acknowledge(null);  
+         }
+         log.info("consumed queue1");
+         
+         log.info("consuming queue2");
+         for (int i = 0; i < 50; i++)
+         {       
+            queue2.deliver(true);
+            Delivery del = receiver2.getDelivery();
+            log.info("Got delivery: " + del.getReference().getMessageID());
+            del.acknowledge(null);  
+         }
+         
+         log.info("consuming queue3");
+         for (int i = 0; i < 75; i++)
+         {       
+            queue3.deliver(true);
             Delivery del = receiver3.getDelivery();
             log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);
-            queue3.deliver(false);
+            del.acknowledge(null);  
          }
          
+         log.info("consuming queue4");
+         for (int i = 0; i < 10; i++)
+         {       
+            queue4.deliver(true);
+            Delivery del = receiver4.getDelivery();
+            log.info("Got delivery: " + del.getReference().getMessageID());
+            del.acknowledge(null);  
+         }
+         
+         Thread.sleep(2000);
+         
+         log.info("Here are the sizes:");
+         
+         msgs = queue1.browse();
+         log.info("queue1: " + msgs.size());
+         
+         msgs = queue2.browse();
+         log.info("queue2: " + msgs.size());
+           
          msgs = queue3.browse();
+         log.info("queue3: " + msgs.size());
+         
+         msgs = queue4.browse();
+         log.info("queue4: " + msgs.size());
+         
+         msgs = queue5.browse();
+         log.info("queue5: " + msgs.size());
+         
+         log.info("consuming queue5");
+         for (int i = 0; i < 5; i++)
+         {       
+            queue5.deliver(true);
+            Delivery del = receiver5.getDelivery();
+            log.info("Got delivery: " + del.getReference().getMessageID());
+            del.acknowledge(null);  
+         }
+         
+         msgs = queue1.browse();
          assertEquals(0, msgs.size());
          
-         queue3.deliver(false);
+         msgs = queue2.browse();
+         assertEquals(0, msgs.size());
+           
+         msgs = queue3.browse();
+         assertEquals(0, msgs.size());
          
-         Delivery del = receiver3.getDelivery();
+         msgs = queue4.browse();
+         assertEquals(0, msgs.size());
          
-         log.info("delivery is " + del);
-         
+         msgs = queue5.browse();
+         assertEquals(0, msgs.size());
       }
       finally
       { 




More information about the jboss-cvs-commits mailing list