[jboss-cvs] JBoss Messaging SVN: r1474 - in trunk: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/tx tests tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/crash tests/src/org/jboss/test/messaging/jms/server/destination

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 16 18:03:23 EDT 2006


Author: timfox
Date: 2006-10-16 18:03:03 -0400 (Mon, 16 Oct 2006)
New Revision: 1474

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/tx/AckInfo.java
   trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java
   trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
   trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
Log:
Interim commit for http://jira.jboss.com/jira/browse/JBMESSAGING-575



Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -499,7 +499,7 @@
       }
       else
       {
-         throw new IllegalStateException("Failed to acknowledge delivery " + d);
+         throw new IllegalStateException("Could not find delivery to acknowledge");
       }             
    }      
    
@@ -799,7 +799,7 @@
     * QueueExecutor might be share by other consumers and we don't want to wait for their
     * tasks to complete
     */
-   private class Waiter implements Runnable
+   private static class Waiter implements Runnable
    {
       Future result;
       

Modified: trunk/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/AckInfo.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/tx/AckInfo.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -48,8 +48,7 @@
    // One of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
    private int ackMode;
    
-   // The actual proxy must not get serialized
-   protected transient MessageProxy msg;
+   protected MessageProxy msg;
    
    // Static --------------------------------------------------------
    

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -41,7 +41,7 @@
  */
 public class ResourceManagerFactory
 {      
-   public static ResourceManagerFactory instance = new ResourceManagerFactory();
+   public static final ResourceManagerFactory instance = new ResourceManagerFactory();
    
    private Map holders;
    
@@ -97,7 +97,7 @@
       }      
    }
    
-   private class Holder
+   private static class Holder
    {
       ResourceManager rm = new ResourceManager();
       

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -168,7 +168,7 @@
       }
       else
       {
-         return handleInternal(sender, ref, tx, true, false, true);
+         return handleInternal(sender, ref, tx, true, false);
       }
    }
 
@@ -521,34 +521,6 @@
 
    // Protected -----------------------------------------------------
    
-   protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
-   {
-      MessageReference ref;
-      
-      if (iter == null)
-      {
-         //We just get the next ref from the head of the queue
-         ref = (MessageReference) messageRefs.peekFirst();
-      }
-      else
-      {
-         // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
-         // We need to extend it to work with refs from the db
-         
-         //We have an iterator - this means we are iterating through the queue to find a ref that matches
-         if (iter.hasNext())
-         {                        
-            ref = (MessageReference)iter.next();
-         } 
-         else
-         {
-            ref = null;
-         }
-      }
-      
-      return ref;
-   }     
-   
    /*
     * This methods delivers as many messages as possible to the router until no
     * more deliveries are returned. This method should never be called at the
@@ -556,7 +528,7 @@
     *
     * @see org.jboss.messaging.core.Channel#deliver()
     */
-   protected void deliverInternal(boolean handle) throws Throwable
+   protected void deliverInternal() throws Throwable
    {
       try
       {
@@ -570,7 +542,7 @@
          {           
             synchronized (deliveryLock)
             {
-               ref = nextReference(iter, handle);               
+               ref = nextReference(iter);               
             }
             if (ref != null)
             {
@@ -587,6 +559,8 @@
                   ref.incrementDeliveryCount();
 
                   Delivery del = router.handle(this, ref, null);
+
+                  receiversReady = del != null;
                   
                   if (del == null)
                   {
@@ -597,8 +571,6 @@
 
                      ref.decrementDeliveryCount();
                      
-                     receiversReady = false;
-                     
                      break;
                   }
                   else if (!del.isSelectorAccepted())
@@ -619,7 +591,7 @@
                      if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
                      
                      // Receiver accepted the reference
-
+                     
                      // We must synchronize here to cope with a race condition where message
                      // is cancelled/acked in flight while the following few actions are being
                      // performed. e.g. delivery could be cancelled acked after being removed from
@@ -666,8 +638,7 @@
    }
 
    protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
-                                     Transaction tx, boolean persist, boolean synchronous,
-                                     boolean deliver)
+                                     Transaction tx, boolean persist, boolean synchronous)
    {
       if (ref == null)
       {
@@ -721,10 +692,10 @@
             
             // We only do delivery if there are receivers that haven't said they don't want
             // any more references.
-            if (receiversReady && deliver)
+            if (receiversReady)
             {
                // Prompt delivery
-               deliverInternal(true);
+               deliverInternal();
             }
          }
          else
@@ -744,7 +715,7 @@
             else
             {
                // add to post commit callback
-               getCallback(tx, synchronous, deliver).addRef(ref);
+               getCallback(tx, synchronous).addRef(ref);
                
                if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
             }
@@ -791,7 +762,7 @@
       }
       else
       {
-         this.getCallback(tx, synchronous, false).addDelivery(d);
+         this.getCallback(tx, synchronous).addDelivery(d);
    
          if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
    
@@ -819,13 +790,13 @@
       return removed;
    }     
 
-   protected InMemoryCallback getCallback(Transaction tx, boolean synchronous, boolean deliver)
+   protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
    {
       InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);            
 
       if (callback == null)
       {
-         callback = new InMemoryCallback(synchronous, deliver);
+         callback = new InMemoryCallback(synchronous);
 
          tx.addCallback(callback, this);
       }
@@ -836,10 +807,6 @@
          {
             throw new IllegalStateException("Callback synchronousness status doesn't match");
          }
-         if (callback.isDeliver() != deliver)
-         {
-            throw new IllegalStateException("Callback deliver status doesn't match");
-         }
       }
 
       return callback;
@@ -928,6 +895,34 @@
          }
       }
    }
+   
+   private MessageReference nextReference(ListIterator iter) throws Throwable
+   {
+      MessageReference ref;
+      
+      if (iter == null)
+      {
+         //We just get the next ref from the head of the queue
+         ref = (MessageReference) messageRefs.peekFirst();
+      }
+      else
+      {
+         // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
+         // We need to extend it to work with refs from the db
+         
+         //We have an iterator - this means we are iterating through the queue to find a ref that matches
+         if (iter.hasNext())
+         {                        
+            ref = (MessageReference)iter.next();
+         } 
+         else
+         {
+            ref = null;
+         }
+      }
+      
+      return ref;
+   } 
 
    // Inner classes -------------------------------------------------
 
@@ -938,33 +933,24 @@
       private List deliveriesToRemove;
       
       private boolean synchronous;
-      
-      private boolean deliver;
-      
+          
       private boolean committing;
 
       private Future result;
 
-      private InMemoryCallback(boolean synchronous, boolean deliver)
+      private InMemoryCallback(boolean synchronous)
       {
          refsToAdd = new ArrayList();
 
          deliveriesToRemove = new ArrayList();
          
          this.synchronous = synchronous;
-         
-         this.deliver = deliver;
       }
       
       private boolean isSynchronous()
       {
          return synchronous;
       }
-      
-      private boolean isDeliver()
-      {
-         return deliver;
-      }
 
       private void addRef(MessageReference ref)
       {
@@ -1103,7 +1089,7 @@
       private void doAfterCommit() throws Throwable
       {
          // We add the references to the state
-
+         
          Iterator iter = refsToAdd.iterator();
 
          while (iter.hasNext())
@@ -1126,7 +1112,7 @@
          }
 
          // Remove deliveries
-
+         
          iter = this.deliveriesToRemove.iterator();
 
          while (iter.hasNext())
@@ -1151,9 +1137,9 @@
          }
          
          //prompt delivery
-         if (deliver && receiversReady)
+         if (receiversReady)
          {
-            deliverInternal(true);
+            deliverInternal();
          }
       }
 
@@ -1210,9 +1196,7 @@
          {
             if (router.numberOfReceivers() > 0)
             {               
-               receiversReady = true;
-               
-               deliverInternal(false);                     
+               deliverInternal();                     
             }
             if (result != null)
             {
@@ -1250,7 +1234,7 @@
 
       public void run()
       {
-         Delivery d = handleInternal(sender, ref, null, persist, false, true);
+         Delivery d = handleInternal(sender, ref, null, persist, false);
          result.setResult(d);
       }
    }   

Modified: trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/LockMap.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -49,7 +49,7 @@
       int refCount;   
    }
    
-   public static LockMap instance = new LockMap();
+   public static final LockMap instance = new LockMap();
    
    private LockMap()
    {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.core.plugin.contract;
 
+import java.util.Collection;
+
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 
@@ -53,4 +55,6 @@
     * @throws Throwable
     */
    Binding unbindClusteredQueue(String queueName) throws Throwable;         
+   
+   Collection listAllBindingsForCondition(String condition) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -213,7 +213,7 @@
       }
       
       lock.writeLock().acquire();
-      
+
       try
       {         
          Binding binding = removeBinding(this.nodeId, queueName);
@@ -237,49 +237,11 @@
    
    public Collection listBindingsForCondition(String condition) throws Exception
    {
-      if (condition == null)
-      {
-         throw new IllegalArgumentException("Condition is null");
-      }
-      
-      lock.readLock().acquire();
-      
-      try
-      {
-         //We should only list the bindings for the local node
+      return listBindingsForConditionInternal(condition, true);
+   }  
+   
+   
          
-         Bindings cb = (Bindings)conditionMap.get(condition);                  
-                  
-         if (cb == null)
-         {
-            return Collections.EMPTY_LIST;
-         }
-         else
-         {
-            List list = new ArrayList();
-            
-            Collection bindings = cb.getAllBindings();
-            
-            Iterator iter = bindings.iterator();
-            
-            while (iter.hasNext())
-            {
-               Binding binding = (Binding)iter.next();
-               
-               if (binding.getNodeId() == this.nodeId)
-               {
-                  list.add(binding);
-               }
-            }
-            
-            return list;
-         }
-      }
-      finally
-      {
-         lock.readLock().release();
-      }
-   }
    
    public Binding getBindingForQueueName(String queueName) throws Exception
    {    
@@ -289,7 +251,7 @@
       }
       
       lock.readLock().acquire();
-      
+
       try
       {
          Map nameMap = (Map)nameMaps.get(new Integer(this.nodeId));
@@ -402,11 +364,57 @@
    }
      
    // Protected -----------------------------------------------------
+   
+   protected Collection listBindingsForConditionInternal(String condition, boolean localOnly) throws Exception
+   {
+      if (condition == null)
+      {
+         throw new IllegalArgumentException("Condition is null");
+      }
+      
+      lock.readLock().acquire();
+      
+      try
+      {
+         //We should only list the bindings for the local node
+         
+         Bindings cb = (Bindings)conditionMap.get(condition);                  
+                  
+         if (cb == null)
+         {
+            return Collections.EMPTY_LIST;
+         }
+         else
+         {
+            List list = new ArrayList();
+            
+            Collection bindings = cb.getAllBindings();
+            
+            Iterator iter = bindings.iterator();
+            
+            while (iter.hasNext())
+            {
+               Binding binding = (Binding)iter.next();
+               
+               if (!localOnly || (binding.getNodeId() == this.nodeId))
+               {
+                  list.add(binding);
+               }
+            }
+            
+            return list;
+         }
+      }
+      finally
+      {
+         lock.readLock().release();
+      }
+   }
     
    protected void loadBindings() throws Exception
    {
       lock.writeLock().acquire();
-      
+
       Connection conn = null;
       PreparedStatement ps  = null;
       ResultSet rs = null;
@@ -448,7 +456,7 @@
       finally
       {
          lock.writeLock().release();
-         
+
          if (rs != null)
          {
             rs.close();
@@ -593,7 +601,7 @@
          nameMaps.put(new Integer(binding.getNodeId()), nameMap);
       }
       
-      nameMap.put(binding.getQueue().getName(), binding);
+      nameMap.put(binding.getQueue().getName(), binding);      
    }
    
    protected void addToConditionMap(Binding binding)
@@ -667,7 +675,7 @@
          conditionMap.remove(binding.getCondition());
       }        
    }         
-   
+
    protected Map getDefaultDMLStatements()
    {                
       Map map = new LinkedHashMap();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -54,6 +54,11 @@
             request =  new BindRequest();
             break;
          }
+         case PullMessagesResultRequest.TYPE:
+         {
+            request = new PullMessagesResultRequest();
+            break;
+         }
          case MessageRequest.TYPE:
          {
             request = new MessageRequest();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -247,6 +247,11 @@
    
    public synchronized void start() throws Exception
    {    
+      if (started)
+      {
+         log.warn("Attempt to start() but " + this + " is already started");
+      }
+      
       if (syncChannelConfigE != null)
       {        
          this.syncChannel = new JChannel(syncChannelConfigE);
@@ -301,6 +306,11 @@
 
    public synchronized void stop() throws Exception
    {
+      if (!started)
+      {
+         log.warn("Attempt to stop() but " + this + " is not started");
+      }
+      
       super.stop();
       
       statsSender.stop();
@@ -322,7 +332,7 @@
       {
          log.trace(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
       }
-            
+        
       if (queue.getNodeId() != this.nodeId)
       {
          throw new IllegalArgumentException("Queue node id does not match office node id");
@@ -345,7 +355,7 @@
       {
          log.trace(this.nodeId + " unbind clustered queue: " + queueName);
       }
-      
+        
       Binding binding = (Binding)super.unbindQueue(queueName);
       
       UnbindRequest request = new UnbindRequest(nodeId, queueName);
@@ -375,7 +385,7 @@
       boolean routed = false;
       
       lock.readLock().acquire();
-         
+   
       try
       {      
          ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
@@ -533,6 +543,11 @@
       return false;
    }
    
+   public Collection listAllBindingsForCondition(String condition) throws Exception
+   {
+      return listBindingsForConditionInternal(condition, false);
+   }
+   
    // PostOfficeInternal implementation ------------------------------------------------------------------
    
    /*
@@ -543,7 +558,7 @@
       throws Exception
    {
       lock.writeLock().acquire();
-      
+    
       if (trace)
       {
          log.trace(this.nodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);        
@@ -614,7 +629,7 @@
    public void handleAddressNodeMapping(NodeAddressInfo info, int nodeId) throws Exception
    {
       lock.writeLock().acquire();
-      
+
       if (trace)
       {
          log.trace(this.nodeId + " Adding address node mapping for " + info.getSyncChannelAddress() +
@@ -641,7 +656,7 @@
       }
             
       lock.readLock().acquire();  
-      
+
       // Need to reference the message
       MessageReference ref = null;
       try
@@ -747,40 +762,6 @@
    }
    
    /*
-    * Unicast a sync request
-    */
-   public Object syncSendRequest(ClusterRequest request, int nodeId, boolean ignoreNoAddress) throws Exception
-   {              
-      if (trace) { log.trace(this.nodeId + " sending synch request to single node, request: " + request + " node " + nodeId); }
-            
-      Address address = this.getAddressForNodeId(nodeId, true);
-      
-      if (trace) { log.trace(this.nodeId + " sending to address " + address); }      
-      
-      if (address == null)
-      {
-         if (ignoreNoAddress)
-         {
-            return null;
-         }
-         else
-         {
-            throw new IllegalArgumentException("Cannot find address for node " + nodeId);
-         }
-      }
-      
-      byte[] bytes = writeRequest(request);
-            
-      Message message = new Message(address, null, bytes);      
-      
-      Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-      
-      if (trace) { log.trace(this.nodeId + " received response: " + result); }
-                 
-      return result;
-   }
-   
-   /*
     * We put the transaction in the holding area
     */
    public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -798,10 +779,10 @@
       if (trace) { log.trace(this.nodeId + " committing transaction " + id ); }
       
       ClusterTransaction tx = null;
-      
+        
       synchronized (holdingArea)
       {
-         tx = (ClusterTransaction)holdingArea.remove(id);
+         tx = (ClusterTransaction)holdingArea.remove(id);                
       }
       
       if (tx == null)
@@ -872,7 +853,7 @@
       if (trace) { log.trace(this.nodeId + " check complete"); }
    }
    
-   public synchronized void sendQueueStats() throws Exception
+   public void sendQueueStats() throws Exception
    {
       if (!started)
       {
@@ -880,7 +861,7 @@
       }
       
       lock.readLock().acquire();
-      
+ 
       List statsList = null;      
       
       try
@@ -934,7 +915,7 @@
    public void updateQueueStats(int nodeId, List statsList) throws Exception
    {
       lock.readLock().acquire();
-      
+
       if (trace) { log.trace(this.nodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
       
       try
@@ -983,6 +964,7 @@
                   
                   if (localQueue != null)
                   {               
+                     //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
                      RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
                      
                      if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
@@ -1013,31 +995,59 @@
    {
       return pm.referenceExists(channelID, messageID);
    } 
+  
    
-   public List getDeliveries(String queueName, int numMessages) throws Exception
+   public void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " getting max " + numMessages + " deliveries for " + queueName); }
-      
+      if (trace) { log.trace(this.nodeId + " handling pull result " + messages + " for " + queueName); }
+               
       Binding binding = getBindingForQueueName(queueName);
       
       if (binding == null)
       {
-         throw new IllegalArgumentException("Cannot find binding for queue " + queueName);
+         //This might happen if the queue is unbound
+         return;
       }
+            
+      LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
+           
+      RemoteQueueStub remoteQueue = localQueue.getPullQueue();
       
-      LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
+      if (remoteNodeId != remoteQueue.getNodeId())
+      {
+         //It might have changed since the request was sent
+         Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
+         
+         if (bindings != null)
+         {
+            binding = (Binding)bindings.get(queueName);
+            
+            if (binding != null)
+            {                     
+              remoteQueue = (RemoteQueueStub)binding.getQueue();                              
+            }
+         }
+      }
       
-      List dels = queue.getDeliveries(numMessages);
-      
-      if (trace) { log.trace(this.nodeId + " retrieved " + dels.size() + " deliveries from " + queueName); }
-      
-      return dels;
+      if (remoteQueue != null)
+      {
+         localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId);
+      }     
+      else
+      {
+         //TODO need to send a rollback to the remote queue otherwise will get leak on remote node
+         //in holding area
+      }
    }
    
-                     
+   
+   public int getNodeId()
+   {
+      return nodeId;
+   }
+                        
    // Public ------------------------------------------------------------------------------------------
-   
-   
+      
    //Used for testing only
    public void setFail(boolean beforeCommit, boolean afterCommit)
    {
@@ -1081,7 +1091,7 @@
          bindings.addRouter(queueName, router);
       }
       
-      router.add(binding.getQueue());                  
+      router.add(binding.getQueue());                
    }
 
    protected void removeFromConditionMap(Binding binding)
@@ -1233,7 +1243,7 @@
    private void removeBindingsForAddress(Integer nodeId) throws Exception
    {
       lock.writeLock().acquire();
-      
+
       try
       {          
          Map nameMap = (Map)nameMaps.get(nodeId);
@@ -1379,9 +1389,9 @@
    }
    
    private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
-   {
+   {  
       lock.readLock().acquire();
-      
+
       try
       {
          NodeAddressInfo info = (NodeAddressInfo)nodeIdAddressesMap.get(new Integer(nodeId));
@@ -1447,7 +1457,7 @@
       }
       
       public void setState(byte[] bytes)
-      {
+      { 
          if (bytes != null)
          {
             

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -204,12 +204,21 @@
    
    public List getQueues()
    {
-      return nonLocalQueues;
+      List queues = new ArrayList();
+      
+      if (localQueue != null)
+      {
+         queues.add(localQueue);
+      }
+      
+      queues.addAll(nonLocalQueues);
+      
+      return queues;
    }
 
    public int numberOfReceivers()
    {
-      return nonLocalQueues.size();
+      return nonLocalQueues.size() + (localQueue != null ? 1 : 0);
    }
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -25,7 +25,6 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -37,11 +36,8 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.messaging.core.tx.TransactionRepository;
-import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
-import org.jboss.messaging.util.StreamUtils;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -105,6 +101,11 @@
    {
       this.pullQueue = queue;
    }
+   
+   public RemoteQueueStub getPullQueue()
+   {
+      return pullQueue;
+   }
       
    public QueueStats getStats()
    {      
@@ -141,18 +142,6 @@
    }
    
    /*
-    * Used when pulling messages from a remote queue
-    */
-   public List getDeliveries(int number) throws Exception
-   {
-      Future result = new Future();
-      
-      this.executor.execute(new GetDeliveriesRunnable(result, 1));
-            
-      return (List)result.getResult();
-   }
-   
-   /*
     * This is the same as the normal handle() method on the Channel except it doesn't
     * persist the message even if it is persistent - this is because persistent messages
     * are always persisted on the sending node before sending.
@@ -189,169 +178,30 @@
       acknowledgeInternal(d, null, false, false);      
    }
    
+   public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages, long holdingTxId) throws Exception
+   { 
+      //This needs to be run on a different thread to the one used by JGroups to deliver the message
+      //to avoid deadlock
+      Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId);
+      
+      executor.execute(runnable);      
+   }
    
-   protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+   //TODO it's not ideal that we need to pass in a PullMessagesRequest
+   public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) throws Exception
    {
-      MessageReference ref = super.nextReference(iter, handle);
+      //This needs to be run on a different thread to the one used by JGroups to deliver the message
+      //to avoid deadlock
+      Runnable runnable = new MessagePullRequestRunnable(returnNodeId, number, txId, tx);
       
-      if (ref == null)
-      {
-         //There are no available refs in the local queue
-         //Maybe we need to pull one (some) from a remote queue?
-         
-         if (pullMessages())
-         {
-            ref = super.nextReference(iter, handle);
-         }
-      }
-      
-      return ref;
+      executor.execute(runnable);
    }
-
+       
    public boolean isClustered()
    {
       return true;
    }
-   
-   /**
-    * Pull messages from a remote queue to this queue.
-    * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
-    * Normally this would require 2PC which would make performance suck.
-    * However since we know both queues share the same DB then we can do the persistence locally in the same
-    * tx thus avoiding 2PC and maintaining reliability:)
-    * We do the following:
-    * 
-    * 1. A tx is started locally
-    * 2. Create deliveries for message(s) on the remote node - bring messages back to the local node
-    * We send a message to the remote node to retrieve a set of deliveries from the queue - it gets a max of num
-    * deliveries.
-    * The unreliable ones can be acknowledged immediately, the reliable ones are not acknowledged and a holding transaction
-    * is placed in the holding area on the remote node, which contains knowledge of the deliveries.
-    * The messages corresponding to the deliveries are returned to the local node
-    * 3. The retrieved messages are added to the local queue in the tx
-    * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
-    * 5. The local tx is committed.
-    * 6. Send "commit" message to remote node
-    * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
-    * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
-    * depending on whether they exist in the database
-    * 
-    * Recovery is handled in the same way as CastMessagesCallback
-    * 
-    * This method will always be executed on the channel's event queue (via the deliver method)
-    * so no need to do any handles or acks inside another event message
-    */
-   private boolean pullMessages() throws Throwable
-   {      
-      if (pullQueue == null)
-      {
-         return false;
-      }
-      
-      //TODO we can optimise this for the case when only one message is pulled
-      //and when only non persistent messages are pulled - i.e. we don't need
-      //to create a transaction.
-      
-      RemoteQueueStub theQueue = pullQueue;
-         
-      Transaction tx = tr.createTransaction();
-         
-      ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
-                                                   name, 1);
-      
-      if (trace)
-      {
-         log.trace(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
-                   " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
-      }
-      
-      log.info("==================== Executing pull messages request");
-      byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
-      log.info("==================== Executed pull messages request");
-      
-      if (bytes == null)
-      {
-         //Ok - node might have left the group
-         return false;
-      }
-      
-      PullMessagesResponse response = new PullMessagesResponse();
-      
-      StreamUtils.fromBytes(response, bytes);
-
-      List msgs = response.getMessages();
-      
-      if (trace) { log.trace(System.identityHashCode(this) + " I retrieved " + msgs.size() + " messages from pull"); }
-      
-      Iterator iter = msgs.iterator();
-      
-      boolean containsReliable = false;
-      
-      while (iter.hasNext())
-      {
-         org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
-         
-         if (msg.isReliable())
-         {
-            //It will already have been persisted on the other node
-            msg.setPersisted(true);
-            
-            containsReliable = true;
-         }
-               
-         MessageReference ref = null;
-         
-         try
-         {
-            ref = ms.reference(msg);
-            
-            //It's ok to call this directly since this method is only ever called by the delivery thread
-            //We call it with the deliver parameter set to false - this prevents delivery being done
-            //after the ref is added - if delivery was done we would end up in recursion.
-            Delivery delRet = handleInternal(null, ref, tx, true, true, false);
-            
-            if (delRet == null || !delRet.isSelectorAccepted())
-            {
-               //This should never happen
-               throw new IllegalStateException("Queue did not accept reference!");
-            }
-            
-         }
-         finally
-         {
-            if (ref != null)
-            {
-               ref.releaseMemoryReference();
-            }
-         }
-                 
-         //Acknowledge on the remote queue stub
-         Delivery del = new SimpleDelivery(theQueue, ref);
-         
-         del.acknowledge(tx);        
-      }
-          
-      tx.commit();
-      
-      //TODO what if commit throws an exception - this means the commit message doesn't hit the 
-      //remote node so the holding transaction stays in the holding area 
-      //Need to catch the exception and throw a check message
-      //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
-      //and send a checkrequest
-      //This applies to a normal message and messages requests too
-            
-      //We only need to send a commit message if there were reliable messages since otherwise
-      //the transaction wouldn't have been added in the holding area
-      if (containsReliable && isRecoverable())
-      {         
-         req = new PullMessagesRequest(this.nodeId, tx.getId());
-         
-         office.asyncSendRequest(req, theQueue.getNodeId());
-      }      
-      
-      return !msgs.isEmpty();
-   }
-   
+        
    public int getRefCount()
    {
       //We are only interested in getting the reference count when delivery is not in progress
@@ -372,6 +222,46 @@
       return ((Integer)result.getResult()).intValue();
    }
    
+   protected void deliverInternal() throws Throwable
+   {      
+      super.deliverInternal();
+       
+      //If the receivers are still ready to accept more refs then we might pull messages
+      //from a remote queue
+          
+      if (receiversReady && pullQueue != null)
+      {
+         //We send a message to the remote queue to pull a message - the remote queue will then send back
+         //another message asynchronously with the result.
+         //We don't do this synchronously with a message dispatcher since that can lead to distributed
+         //deadlock
+          
+         sendPullMessage();
+      }
+   }
+   
+   private void sendPullMessage() throws Exception
+   {
+      if (pullQueue == null)
+      {
+         //Nothing to do
+         return;
+      }
+      
+      //Avoid synchronization
+      RemoteQueueStub theQueue = pullQueue;
+            
+      if (theQueue == null)
+      {
+         return;
+      }
+      
+      executor.execute(new SendPullRequestRunnable(theQueue));          
+   }
+   
+   /*
+    * Get the ref count - executed on event queue
+    */
    private class GetRefCountRunnable implements Runnable
    {
       Future result;
@@ -389,24 +279,94 @@
       }
    }  
    
-   private class GetDeliveriesRunnable implements Runnable
+   /*
+    * Send a message pull request.
+    * 
+    * TODO - do we really need this class?
+    * Why can't we just execute on the same thread?
+    */
+   private class SendPullRequestRunnable implements Runnable
    {
-      Future result;
+      private RemoteQueueStub theQueue;
       
+      private SendPullRequestRunnable(RemoteQueueStub theQueue)
+      {
+         this.theQueue = theQueue;
+      }
+
+      public void run()
+      {
+         try
+         {
+            //TODO
+            //We create a tx just so we get the id - we could just get the id directly from the id
+            //manager
+            Transaction tx = tr.createTransaction();
+                             
+            ClusterRequest req = new PullMessagesRequest(nodeId, tx.getId(), theQueue.getChannelID(),
+                                                         name, 1);
+            
+            office.asyncSendRequest(req, theQueue.getNodeId()); 
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to pull message", e);
+         }
+      }
+      
+   }
+   
+   /**
+    * This is how we "pull" messages from one node to another
+    * If any of the messages are reliable then this needs to be done reliable (i.e. without loss or redelivery)
+    * Normally this would require 2PC which would make performance suck.
+    * However since we know both queues share the same DB then we can do the persistence locally in the same
+    * tx thus avoiding 2PC and maintaining reliability :)
+    * We do the following:
+    * 
+    * 1. Send a PullMessagesRequest to the remote node, on receipt it will create deliveries for message(s), and 
+    * possibly add a holding tx (if there are any persistent messages), the messages will then be returned in
+    * a PullMessagesResultRequest which is sent asynchronously from the remote node back to here to avoid
+    * distributed deadlock.
+    * 2. When the result is returned it hits this method.
+    * 3. The retrieved messages are added to the local queue in the tx
+    * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+    * 5. The local tx is committed.
+    * 6. Send "commit" message to remote node
+    * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
+    * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
+    * depending on whether they exist in the database
+    * 
+    * Recovery is handled in the same way as CastMessagesCallback
+    * 
+    */   
+   
+   private class MessagePullRequestRunnable implements Runnable
+   { 
+      int returnNodeId;
+      
       int number;
       
-      public GetDeliveriesRunnable(Future result, int number)
-      {
-         this.result = result;
+      TransactionId txId;
+      
+      PullMessagesRequest tx;
+      
+      public MessagePullRequestRunnable(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx)
+      { 
+         this.returnNodeId = returnNodeId;
          
          this.number = number;
+         
+         this.txId = txId;
+         
+         this.tx = tx;
       }
       
       public void run()
       {
          try
          {
-            List list = null;
+            List dels = null;
             
             //We only get the refs if receiversReady = false so as not to steal messages that
             //might be consumed by local receivers            
@@ -416,7 +376,7 @@
                
                MessageReference ref;
                
-               list = new ArrayList();
+               dels = new ArrayList();
                
                synchronized (refLock)
                {
@@ -428,7 +388,7 @@
                         
                         deliveries.add(del);
                         
-                        list.add(del);       
+                        dels.add(del);       
                         
                         count++;
                      }  
@@ -437,75 +397,159 @@
             }
             else
             {
-               list = Collections.EMPTY_LIST;
+               dels = Collections.EMPTY_LIST;
             }
             
-            result.setResult(list);
+            if (trace) { log.trace("PullMessagesRunnable got " + dels.size() + " deliveries"); }
+            
+            PullMessagesResultRequest response = new PullMessagesResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(), name, dels.size());
+            
+            List reliableDels = null;
+            
+            if (!dels.isEmpty())
+            {
+               Iterator iter = dels.iterator();
+               
+               Delivery del = (Delivery)iter.next();
+               
+               if (del.getReference().isReliable())
+               {
+                  //Add it to internal list
+                  if (reliableDels == null)
+                  {
+                     reliableDels = new ArrayList();                                    
+                  }
+                  
+                  reliableDels.add(del);
+               }
+               else
+               {
+                  //We can ack it now
+                  del.acknowledge(null);
+               }
+               
+               response.addMessage(del.getReference().getMessage());
+            }
+                 
+            if (reliableDels != null)
+            {
+               //Add this to the holding area
+               tx.setReliableDels(reliableDels);
+               office.holdTransaction(txId, tx);
+            }
+             
+            //We send the messages asynchronously to avoid a deadlock situation which can occur
+            //if we were using MessageDispatcher to get the messages.
+            
+            office.asyncSendRequest(response, returnNodeId);   
          }
-         catch (Exception e)
+         catch (Throwable e)
          {
-            result.setException(e);
+            log.error("Failed to get deliveries", e);
          }                     
       }
    } 
    
-   private class AddReferencesCallback implements TxCallback
+   private class MessagePullResultRunnable implements Runnable
    {
-      private List references;
+      private RemoteQueueStub remoteQueue;
       
-      private AddReferencesCallback(List references)
+      private List messages;
+      
+      private long holdingTxId;
+            
+      private MessagePullResultRunnable(RemoteQueueStub remoteQueue,
+                                               List messages, long holdingTxId)
       {
-         this.references = references;
+         this.remoteQueue = remoteQueue;
+         
+         this.messages = messages;
+         
+         this.holdingTxId = holdingTxId;
       }
 
-      public void afterCommit(boolean onePhase) throws Exception
+      public void run()
       {
-         Iterator iter = references.iterator();
-
-         while (iter.hasNext())
+         try
          {
-            MessageReference ref = (MessageReference) iter.next();
-
-            if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
-
-            try
+            // TODO we should optimise for the case when only one message is pulled which is basically all
+            //we support now anyway
+            //Also we should optimise for the case when only non persistent messages are pulled
+            //in this case we don't need to create a tx.
+            
+            Transaction tx = tr.createTransaction();
+            
+            Iterator iter = messages.iterator();
+            
+            boolean containsReliable = false;
+            
+            while (iter.hasNext())
             {
-               synchronized (refLock)
+               org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+               
+               if (msg.isReliable())
                {
-                  addReferenceInMemory(ref);
+                  //It will already have been persisted on the other node
+                  //so we need to set the persisted flag here
+                  msg.setPersisted(true);
+                  
+                  containsReliable = true;
                }
+                     
+               MessageReference ref = null;
+               
+               try
+               {
+                  ref = ms.reference(msg);
+                  
+                  //Should be executed synchronously since we already in the event queue
+                  Delivery delRet = handleInternal(null, ref, tx, true, true);
+                  
+                  if (delRet == null || !delRet.isSelectorAccepted())
+                  {
+                     //This should never happen
+                     throw new IllegalStateException("Queue did not accept reference!");
+                  }            
+               }
+               finally
+               {
+                  if (ref != null)
+                  {
+                     ref.releaseMemoryReference();
+                  }
+               }
+                       
+               //Acknowledge on the remote queue stub
+               Delivery del = new SimpleDelivery(remoteQueue, ref);
+               
+               del.acknowledge(tx);        
             }
-            catch (Throwable t)
-            {
-               throw new TransactionException("Failed to add reference", t);
-            }
+               
+            tx.commit();
+          
+            //TODO what if commit throws an exception - this means the commit message doesn't hit the 
+            //remote node so the holding transaction stays in the holding area 
+            //Need to catch the exception and throw a check message
+            //What we need to do is catch any exceptions at the top of the call, i.e. just after the interface
+            //and send a checkrequest
+            //This applies to a normal message and messages requests too
+                  
+            //We only need to send a commit message if there were reliable messages since otherwise
+            //the transaction wouldn't have been added in the holding area
+            if (containsReliable && isRecoverable())
+            {         
+               ClusterRequest req = new PullMessagesRequest(nodeId, holdingTxId);
+               
+               office.asyncSendRequest(req, remoteQueue.getNodeId());
+            }  
+         }      
+         catch (Throwable e)
+         {
+            log.error("Failed to handle pulled message", e);
          }
       }
-
-      public void afterPrepare() throws Exception
-      {
-         //NOOP
-      }
-
-      public void afterRollback(boolean onePhase) throws Exception
-      {
-         //NOOP
-      }
-
-      public void beforeCommit(boolean onePhase) throws Exception
-      {
-         //NOOP
-      }
-
-      public void beforePrepare() throws Exception
-      {
-         //NOOP
-      }
-
-      public void beforeRollback(boolean onePhase) throws Exception
-      {
-         //NOOP
-      }
       
    }
+       
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -58,8 +58,6 @@
    
    void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception;
    
-   Object syncSendRequest(ClusterRequest request, int nodeId, boolean ignoreNoAddress) throws Exception;
-   
    void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
    
    void commitTransaction(TransactionId id) throws Throwable;
@@ -69,4 +67,6 @@
    void sendQueueStats() throws Exception;
    
    boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
+   
+   void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable;
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -23,14 +23,12 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.util.StreamUtils;
 
 /**
  * A PullMessagesRequest
@@ -90,47 +88,9 @@
          
          LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
          
-         List dels = queue.getDeliveries(numMessages);
-         
-         if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
-         
-         PullMessagesResponse response = new PullMessagesResponse(dels.size());
-         
-         if (!dels.isEmpty())
-         {
-            Iterator iter = dels.iterator();
-            
-            Delivery del = (Delivery)iter.next();
-            
-            if (del.getReference().isReliable())
-            {
-               //Add it to internal list
-               if (reliableDels == null)
-               {
-                  reliableDels = new ArrayList();                                    
-               }
-               
-               reliableDels.add(del);
-            }
-            else
-            {
-               //We can ack it now
-               del.acknowledge(null);
-            }
-            
-            response.addMessage(del.getReference().getMessage());
-         }
-              
-         if (reliableDels != null)
-         {
-            //Add this to the holding area
-            office.holdTransaction(id, this);
-         }
+         queue.handleGetDeliveriesRequest(nodeId, numMessages, id, this);
           
-         //Convert to bytes since the response isn't serializable (nor do we want it to be)
-         byte[] bytes = StreamUtils.toBytes(response);
-         
-         return bytes;
+         return null;
       }
       else
       {
@@ -139,6 +99,12 @@
          return null;
       }
    }
+   
+   //TODO this is a bit messsy - must be a nicer way of setting this
+   void setReliableDels(List reliableDels)
+   {
+      this.reliableDels = reliableDels;
+   }
 
    byte getType()
    {

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.message.MessageFactory;
-import org.jboss.messaging.util.Streamable;
-
-/**
- * A PullMessagesResponse
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class PullMessagesResponse implements Streamable
-{
-   private List messages;
-   
-   PullMessagesResponse()
-   {
-   }
-   
-   PullMessagesResponse(int size)
-   {
-      messages = new ArrayList(size);
-   }
-   
-   void addMessage(Message msg)
-   {
-      messages.add(msg);
-   }
-   
-   List getMessages()
-   {
-      return messages;
-   }
-
-   public void read(DataInputStream in) throws Exception
-   {
-      int num = in.readInt();
-      
-      messages = new ArrayList(num);
-      
-      for (int i = 0; i < num; i++)
-      {
-         byte type = in.readByte();
-         
-         Message msg = MessageFactory.createMessage(type);
-         
-         msg.read(in);
-         
-         messages.add(msg);
-      }
-   }
-
-   public void write(DataOutputStream out) throws Exception
-   {
-      out.writeInt(messages.size());
-      
-      Iterator iter = messages.iterator();
-      
-      while (iter.hasNext())
-      {
-         Message msg = (Message)iter.next();
-         
-         out.writeByte(msg.getType());
-         
-         msg.write(out);
-      }   
-   }
-}

Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java (from rev 1473, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.message.MessageFactory;
+
+/**
+ * 
+ * A PullMessagesResultRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class PullMessagesResultRequest extends ClusterRequest
+{
+   public static final int TYPE = 2;
+   
+   private long holdingTxId;
+   
+   private String queueName;
+   
+   private List messages;
+   
+   private int remoteNodeId;
+   
+   PullMessagesResultRequest()
+   {
+   }
+   
+   PullMessagesResultRequest(int remoteNodeId, long holdingTxId, String queueName, int size)
+   {
+      this.remoteNodeId = remoteNodeId;
+      
+      this.holdingTxId = holdingTxId;
+      
+      this.queueName = queueName;
+      
+      messages = new ArrayList(size);
+   }
+   
+   void addMessage(Message msg)
+   {
+      messages.add(msg);
+   }
+   
+   List getMessages()
+   {
+      return messages;
+   }
+   
+   public void read(DataInputStream in) throws Exception
+   {
+      remoteNodeId = in.readInt();
+      
+      holdingTxId = in.readLong();
+      
+      queueName = in.readUTF();
+      
+      int num = in.readInt();
+      
+      messages = new ArrayList(num);
+      
+      for (int i = 0; i < num; i++)
+      {
+         byte type = in.readByte();
+         
+         Message msg = MessageFactory.createMessage(type);
+         
+         msg.read(in);
+         
+         messages.add(msg);
+      }
+   }
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeInt(remoteNodeId);
+      
+      out.writeLong(holdingTxId);
+      
+      out.writeUTF(queueName);
+      
+      out.writeInt(messages.size());
+      
+      Iterator iter = messages.iterator();
+      
+      while (iter.hasNext())
+      {
+         Message msg = (Message)iter.next();
+         
+         out.writeByte(msg.getType());
+         
+         msg.write(out);
+      }   
+   }
+
+   Object execute(PostOfficeInternal office) throws Throwable
+   {
+      office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, messages);
+      
+      return null;
+   }
+
+   byte getType()
+   {
+      return TYPE;
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionId.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -87,7 +87,7 @@
       
       TransactionId tother = (TransactionId)other;
       
-      return tother.txId == this.txId && tother.nodeId == this.nodeId;
+      return (tother.txId == this.txId) && (tother.nodeId == this.nodeId);
    }
    
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/Transaction.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/tx/Transaction.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -208,7 +208,7 @@
       if (trace) { log.trace("executing after commit hooks " + this); }
       
       if (firstCallback != null)
-      {
+      {         
          firstCallback.afterCommit(onePhase);
       }
       

Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -179,7 +179,7 @@
    public Transaction createTransaction() throws Exception
    {
       Transaction tx = new Transaction(idManager.getId());
-
+      
       if (trace) { log.trace("created transaction " + tx); }
 
       return tx;

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/build.xml	2006-10-16 22:03:03 UTC (rev 1474)
@@ -91,7 +91,7 @@
    <property name="junit.haltonfailure" value="false"/>
    <property name="junit.fork" value="true"/>
    <property name="junit.includeantruntime" value="true"/>
-   <property name="junit.timeout" value="300000"/>
+   <property name="junit.timeout" value="1200000"/>
    <property name="stress.timeout" value="4800000"/>
 
    <property name="junit.showoutput" value="true"/>

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -476,9 +476,7 @@
       {
          long msgId = rs.getLong(1);     
          long pageOrd = rs.getLong(3);
-         
-         //log.info("Exists " + msgId + " with page ord " + pageOrd);
-         
+          
          msgIds.add(new Long(msgId));
       }
       rs.close();

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -99,12 +99,11 @@
          
          //Add a couple of bindings
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);        
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
+         
          LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-
          Binding binding2 =
             office1.bindClusteredQueue("topic1", queue2);
          
@@ -112,7 +111,7 @@
          
          office2 = createClusteredPostOffice(2, "testgroup");
          
-         Collection bindings = office2.listBindingsForCondition("topic1");
+         Collection bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -129,7 +128,7 @@
   
          //Make sure both nodes pick it up
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(3, bindings.size());
          
@@ -138,7 +137,7 @@
          assertEquivalent(binding2, (Binding)iter.next());
          assertEquivalent(binding3, (Binding)iter.next());
 
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(3, bindings.size());
          
@@ -155,7 +154,7 @@
          
          // Make sure both nodes pick it up
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -165,7 +164,7 @@
          assertEquivalent(binding3, (Binding)iter.next());
          assertEquivalent(binding4, (Binding)iter.next());
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -181,7 +180,7 @@
          
          //Make sure bindings are not longer available on either node
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -189,7 +188,7 @@
          assertEquivalent(binding3, (Binding)iter.next());
          assertEquivalent(binding4, (Binding)iter.next());
    
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -203,7 +202,7 @@
          
          //Maks sure it picks up the bindings
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -220,7 +219,7 @@
          
          // Make sure all nodes pick it up
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(3, bindings.size());
          
@@ -229,7 +228,7 @@
          assertEquivalent(binding4, (Binding)iter.next());
          assertEquivalent(binding5, (Binding)iter.next());
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(3, bindings.size());
          
@@ -238,7 +237,7 @@
          assertEquivalent(binding4, (Binding)iter.next());
          assertEquivalent(binding5, (Binding)iter.next());
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(3, bindings.size());
          
@@ -261,7 +260,7 @@
          
          // Make sure all nodes pick them up
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(5, bindings.size());
          
@@ -272,7 +271,7 @@
          assertEquivalent(binding6, (Binding)iter.next());
          assertEquivalent(binding7, (Binding)iter.next());
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(5, bindings.size());
          
@@ -283,7 +282,7 @@
          assertEquivalent(binding6, (Binding)iter.next());
          assertEquivalent(binding7, (Binding)iter.next());
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(5, bindings.size());
          
@@ -305,7 +304,7 @@
          //All it's non durable bindings should be removed from the other nodes
          //Durable bindings should remain
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -315,7 +314,7 @@
          assertEquivalent(binding5, (Binding)iter.next());
          assertEquivalent(binding6, (Binding)iter.next());
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -328,7 +327,7 @@
          //Stop office 2
          office2.stop();
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -341,7 +340,7 @@
          
          office2 = createClusteredPostOffice(2, "testgroup");
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -349,7 +348,7 @@
          assertEquivalent(binding5, (Binding)iter.next());
          assertEquivalent(binding6, (Binding)iter.next());
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -357,7 +356,7 @@
          assertEquivalent(binding5, (Binding)iter.next());
          assertEquivalent(binding6, (Binding)iter.next());
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(2, bindings.size());
          
@@ -378,21 +377,21 @@
          
          //Only the durable queue should survive
          
-         bindings = office1.listBindingsForCondition("topic1");
+         bindings = office1.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(1, bindings.size());
          
          iter = bindings.iterator();
          assertEquivalent(binding6, (Binding)iter.next());
          
-         bindings = office2.listBindingsForCondition("topic1");
+         bindings = office2.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(1, bindings.size());
          
          iter = bindings.iterator();
          assertEquivalent(binding6, (Binding)iter.next());
          
-         bindings = office3.listBindingsForCondition("topic1");
+         bindings = office3.listAllBindingsForCondition("topic1");
          assertNotNull(bindings);
          assertEquals(1, bindings.size());
          
@@ -540,11 +539,11 @@
          LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-         
+
          LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
-         
+
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-                  
+
          LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          try
@@ -568,10 +567,10 @@
             //Ok
          }
          
-         office1.unbindClusteredQueue("queue1");
-         
          office2.unbindClusteredQueue("queue1");
-         
+
+         office1.unbindClusteredQueue("queue1");
+
          LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
@@ -1319,7 +1318,7 @@
          checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
          
          //n7
-         checkEmpty(receiver12);
+         checkEmpty(receiver14);
          
          
          //Send 1 message at node2
@@ -1351,7 +1350,7 @@
          checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
          
          //n7
-         checkEmpty(receiver12);
+         checkEmpty(receiver14);
          
          //Send 1 message at node3
          //========================
@@ -1382,12 +1381,12 @@
          checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
          
          //n7
-         checkEmpty(receiver12);     
+         checkEmpty(receiver14);     
          
          //Send 1 message at node4
          //========================
-             
-         msgs = sendMessages("topic", persistent, office4, 1, null);
+         
+         msgs = sendMessages("topic", persistent, office4, 1, null);         
                
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1398,22 +1397,22 @@
          checkContainsAndAcknowledge(msgs, receiver4, nonDurable3);
          
          //n4
-         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1);
+         checkContainsAndAcknowledge(msgs, receiver5, sharedDurable1); // shared durable 1
          checkContainsAndAcknowledge(msgs, receiver6, nonSharedDurable2);
          checkContainsAndAcknowledge(msgs, receiver7, nonDurable4);
          checkContainsAndAcknowledge(msgs, receiver8, nonDurable5);
          checkContainsAndAcknowledge(msgs, receiver9, nonDurable6);
          
          //n5
-         checkEmpty(receiver10);         
-         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);
+         checkEmpty(receiver10);       //shared durable 1
+         checkContainsAndAcknowledge(msgs, receiver11, sharedDurable3);     //shared durable 2    
          
          //n6
-         checkEmpty(receiver12);
-         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7);
+         checkEmpty(receiver12); // shared durable 2
+         checkContainsAndAcknowledge(msgs, receiver13, nonDurable7); 
          
          //n7
-         checkEmpty(receiver12);
+         checkEmpty(receiver14);
          
          //Send 1 message at node5
          //========================

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -87,6 +87,45 @@
       super.tearDown();
    }
    
+   public void testSize() throws Exception
+   {
+      DefaultRouter dr = new DefaultRouter();
+      
+      ClusteredQueue queue1 = new SimpleQueue(true);
+      dr.add(queue1);
+      
+      assertEquals(1, dr.numberOfReceivers());
+      assertEquals(1, dr.getQueues().size());
+      
+      ClusteredQueue queue2 = new SimpleQueue(false);
+      dr.add(queue2);
+      
+      assertEquals(2, dr.numberOfReceivers());
+      assertEquals(2, dr.getQueues().size());
+      
+      ClusteredQueue queue3 = new SimpleQueue(false);
+      dr.add(queue3);
+      
+      assertEquals(3, dr.numberOfReceivers());
+      assertEquals(3, dr.getQueues().size());
+      
+      dr.remove(queue3);
+      
+      assertEquals(2, dr.numberOfReceivers());
+      assertEquals(2, dr.getQueues().size());
+      
+      dr.remove(queue2);
+      
+      assertEquals(1, dr.numberOfReceivers());
+      assertEquals(1, dr.getQueues().size());
+      
+      dr.remove(queue1);
+      
+      assertEquals(0, dr.numberOfReceivers());
+      assertTrue(dr.getQueues().isEmpty());
+      
+   }
+   
    // The router only has a local queue
    public void testRouterOnlyLocal() throws Exception
    {

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -0,0 +1,936 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.Channel;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+
+public class RedistributionTest extends ClusteringTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RedistributionTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {      
+      super.tearDown();
+   }
+   
+
+   public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
+   {
+      consumeAll(false, false);
+   }
+   
+   public void testConsumeAllPersistentNonRecoverable() throws Throwable
+   {
+      consumeAll(true, false);
+   }
+   
+   public void testConsumeAllNonPersistentRecoverable() throws Throwable
+   {
+      consumeAll(false, true);
+   }
+   
+   public void testConsumeAllPersistentRecoverable() throws Throwable
+   {
+      consumeAll(true, true);
+   }
+   
+   
+   
+   public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
+   {
+      consumeBitByBit(false, false);
+   }
+   
+   public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
+   {
+      consumeBitByBit(true, false);
+   }
+   
+   public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
+   {
+      consumeBitByBit(false, true);
+   }
+   
+   public void testConsumeBitByBitPersistentRecoverable() throws Throwable
+   {
+      consumeBitByBit(true, true);
+   }
+   
+   
+   
+   
+//   
+//   public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
+//   {
+//      consumeConcurrently(false, false);
+//   }
+//   
+//   public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
+//   {
+//      consumeConcurrently(true, false);
+//   }
+//   
+//   public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
+//   {
+//      consumeConcurrently(false, true);
+//   }
+//   
+//   public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
+//   {
+//      consumeConcurrently(true, true);
+//   }
+   
+   protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      DefaultClusteredPostOffice office3 = null;
+      
+      DefaultClusteredPostOffice office4 = null;
+      
+      DefaultClusteredPostOffice office5 = null;
+          
+      try
+      {   
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+         
+         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+         
+         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+                  
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+                  
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+                  
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+                   
+         final int NUM_MESSAGES = 100;
+         
+         this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+                 
+         Thread.sleep(2000);
+         
+         //Check the sizes
+         
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+               
+         assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+           
+         assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         
+         queue1.add(receiver);
+         
+         queue1.deliver(false);
+         
+         Thread.sleep(7000);
+         
+         log.info("Here are the sizes:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         assertEquals(0, queue1.memoryRefCount());
+         assertEquals(NUM_MESSAGES * 5, queue1.memoryDeliveryCount());
+         
+         assertEquals(0, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+           
+         assertEquals(0, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(0, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(0, queue5.memoryRefCount());
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         List messages = receiver.getMessages();
+         
+         assertNotNull(messages);
+         
+         assertEquals(NUM_MESSAGES * 5, messages.size());
+         
+         Iterator iter = messages.iterator();
+         
+         while (iter.hasNext())
+         {
+            Message msg = (Message)iter.next();
+            
+            receiver.acknowledge(msg, null);
+         }
+         
+         receiver.clear();
+         
+         assertEquals(0, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+         
+         checkNoMessageData();
+      }
+      finally
+      { 
+         if (office1 != null)
+         {
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {            
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {            
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {
+            office5.stop();
+         }
+      }
+   }
+   
+   protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      DefaultClusteredPostOffice office3 = null;
+      
+      DefaultClusteredPostOffice office4 = null;
+      
+      DefaultClusteredPostOffice office5 = null;
+          
+      try
+      {   
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+         
+         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+         
+         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+                  
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+                  
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+                  
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+                  
+         final int NUM_MESSAGES = 100;
+          
+         this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
+         this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+                          
+         Thread.sleep(2000);
+                
+         //Check the sizes
+         
+         log.info("Here are the sizes 1:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+               
+         assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+           
+         assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+                 
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         queue1.add(receiver1);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         queue2.add(receiver2);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         queue3.add(receiver3);
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         queue4.add(receiver4);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         queue5.add(receiver5);
+         
+         receiver1.setMaxRefs(5);         
+         queue1.deliver(false);         
+         Thread.sleep(1000);         
+         assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
+         assertEquals(5, queue1.memoryDeliveryCount());
+         
+         acknowledgeAll(receiver1);
+         assertEquals(0, queue1.memoryDeliveryCount());
+         receiver1.setMaxRefs(0);
+         
+         receiver2.setMaxRefs(10);         
+         queue2.deliver(false);         
+         Thread.sleep(1000);
+         assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+         assertEquals(10, queue2.memoryDeliveryCount());
+         acknowledgeAll(receiver2);
+         receiver2.setMaxRefs(0);
+                  
+         receiver3.setMaxRefs(15);         
+         queue3.deliver(false);         
+         Thread.sleep(1000);
+         assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+         assertEquals(15, queue3.memoryDeliveryCount());
+         acknowledgeAll(receiver3);
+         receiver3.setMaxRefs(0);
+         
+         receiver4.setMaxRefs(20);         
+         queue4.deliver(false);         
+         Thread.sleep(1000);
+         assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+         assertEquals(20, queue4.memoryDeliveryCount());
+         acknowledgeAll(receiver4);
+         receiver4.setMaxRefs(0);
+         
+         receiver5.setMaxRefs(25);         
+         queue5.deliver(false);         
+         Thread.sleep(1000);
+         assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
+         assertEquals(25, queue5.memoryDeliveryCount());
+         acknowledgeAll(receiver5);
+         receiver5.setMaxRefs(0);
+         
+         Thread.sleep(1000);
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+         
+         log.info("Here are the sizes 2:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+     
+         //Consume the rest from queue 5
+         receiver5.setMaxRefs(NUM_MESSAGES - 25);
+         queue5.deliver(false);
+         Thread.sleep(5000);         
+         
+         log.info("receiver5 msgs:" + receiver5.getMessages().size());
+         
+         log.info("Here are the sizes 3:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         //This will result in an extra one being pulled from queue1 - we cannot avoid this
+         //This is because the channel does not know that the receiver is full unless it tries
+         //with a ref so it needs to retrieve one
+     
+         assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+          
+         assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(1, queue5.memoryRefCount());         
+         assertEquals(NUM_MESSAGES - 25, queue5.memoryDeliveryCount());
+         
+         acknowledgeAll(receiver5);
+         
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         receiver5.setMaxRefs(0);
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+         
+         //Now consume 5 more from queue5, they should come from queue1 which has the most messages
+         
+         log.info("Consume 5 more from queue 5");
+         
+         receiver5.setMaxRefs(5);
+         queue5.deliver(false);
+         Thread.sleep(3000);
+           
+         log.info("Here are the sizes 4:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+          
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+          
+         assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(1, queue5.memoryRefCount());          
+         assertEquals(5, queue5.memoryDeliveryCount());
+               
+         acknowledgeAll(receiver5);
+
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         receiver1.setMaxRefs(0);
+           
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+          
+         //Consume 1 more - should pull one from queue2
+         
+         receiver5.setMaxRefs(1);
+         queue5.deliver(false);
+         Thread.sleep(2000);
+          
+         log.info("Here are the sizes 5:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+          
+         assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertEquals(1, queue5.memoryRefCount());          
+         assertEquals(1, queue5.memoryDeliveryCount());
+                  
+         acknowledgeAll(receiver5);
+         
+         assertEquals(0, queue5.memoryDeliveryCount());
+         
+         receiver5.setMaxRefs(0);
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+         
+         //From queue 4 consume everything else
+         
+         receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
+         queue4.deliver(false);
+         Thread.sleep(7000);
+         
+         log.info("Here are the sizes 6:");         
+         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
+         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
+         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
+         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+         
+         assertEquals(0, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(0, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());
+          
+         assertEquals(0, queue3.memoryRefCount());
+         assertEquals(0, queue3.memoryDeliveryCount());
+         
+         assertEquals(0, queue4.memoryRefCount());
+         assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.memoryDeliveryCount());
+         
+         assertEquals(0, queue5.memoryRefCount());          
+         assertEquals(0, queue5.memoryDeliveryCount());
+                  
+         acknowledgeAll(receiver4);
+         
+         assertEquals(0, queue4.memoryDeliveryCount());
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         assertTrue(office4.getHoldingTransactions().isEmpty());
+         assertTrue(office5.getHoldingTransactions().isEmpty());
+         
+         checkNoMessageData();
+      }
+      finally
+      { 
+         if (office1 != null)
+         {
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {            
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {            
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {
+            office5.stop();
+         }
+      }
+   }
+   
+   protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      DefaultClusteredPostOffice office3 = null;
+      
+      DefaultClusteredPostOffice office4 = null;
+      
+      DefaultClusteredPostOffice office5 = null;
+          
+      try
+      {   
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+         
+         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
+         
+         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+                  
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+                  
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+                  
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+                   
+         //Test with no consumers on queue1
+         
+         //Two equal consumers on queue2 and queue3
+                           
+         //Add messages at queue 1
+         
+         final int NUM_MESSAGES = 10000;
+         
+         this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
+         
+         log.info("sent messages");
+         
+         Thread.sleep(4000);
+         
+         ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
+         queue1.add(receiver1);
+         queue1.deliver(false);  
+         
+         ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
+         queue2.add(receiver2);
+         queue2.deliver(false);  
+         
+         Thread.sleep(45000);
+         
+         log.info("receiver1: " + receiver1.getTotalCount());
+         
+         log.info("receiver2: " + receiver2.getTotalCount());
+         
+
+         //test1
+         
+         
+         //No consumer on node 1
+         //Very slow consumer on node 2
+         //
+         
+         /*
+          * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
+          * Fast consumer on node2
+          * Send messages on node 1
+          * Verify all go to node1 consumer
+          * 
+          * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
+          * Fast consumer on node2
+          * Send messages on node 1
+          * Verify most go to node1 consumer, some go to node 2
+          * 
+          * Test with slow consumer on node 1, Fast consumer on node 2
+          * 
+          * Test with no consumer on node 1, consumers on other nodes
+          * 
+          * Things up all the other permutations, then take a guess with error margin of
+          * how many messages should be on each node.
+          */
+         
+         
+         checkNoMessageData();
+      }
+      finally
+      { 
+         if (office1 != null)
+         {
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {            
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {            
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {
+            office5.stop();
+         }
+      }
+   }
+   
+   class ThrottleReceiver implements Receiver, Runnable
+   {
+      long pause;
+      
+      volatile int totalCount;
+      
+      int count;
+      
+      int maxSize;
+      
+      volatile boolean full;
+      
+      Executor executor;
+      
+      List dels;
+      
+      Channel queue;
+      
+      int getTotalCount()
+      {
+         return totalCount;
+      }
+      
+      ThrottleReceiver(Channel queue, long pause, int maxSize)
+      {
+         this.queue = queue;
+         
+         this.pause = pause;
+         
+         this.maxSize = maxSize;
+         
+         this.executor = new QueuedExecutor();
+         
+         this.dels = new ArrayList();
+      }
+
+      public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+      {
+         if (full)
+         {
+            return null;
+         }
+         
+         //log.info(this + " got ref");
+         
+         //log.info("cnt:" + totalCount);
+         
+         SimpleDelivery del = new SimpleDelivery(observer, reference);
+         
+         dels.add(del);
+         
+         count++;
+         
+         totalCount++;
+         
+         if (count == maxSize)
+         {
+            full = true;
+            
+            count = 0;
+            
+            try
+            {
+               executor.execute(this);
+            }
+            catch (InterruptedException e)
+            {
+               //Ignore
+            }
+         }
+         
+         return del;
+          
+      }
+      
+      public void run()
+      {
+         //Simulate processing of messages
+         
+         try
+         {
+            Thread.sleep(pause);
+         }
+         catch (InterruptedException e)
+         {
+            //Ignore
+         }
+         
+         Iterator iter = dels.iterator();
+         
+         while (iter.hasNext())
+         {
+            Delivery del = (Delivery)iter.next();
+            
+            try
+            {
+               del.acknowledge(null);
+            }
+            catch (Throwable t)
+            {
+               //Ignore
+            }
+         }
+         
+         dels.clear();
+         
+         full = false;
+         
+         queue.deliver(false);
+      }
+      
+   }
+   
+   private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
+   {
+      List messages = receiver.getMessages();
+      
+      Iterator iter = messages.iterator();
+      
+      while (iter.hasNext())
+      {
+         Message msg = (Message)iter.next();
+         
+         receiver.acknowledge(msg, null);
+      }
+      
+      receiver.clear();
+   }
+   
+      
+   protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+   {
+      MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+       
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      ClusterRouterFactory rf = new DefaultRouterFactory();
+      
+      DefaultClusteredPostOffice postOffice = 
+         new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+                                 groupName,
+                                 JGroupsUtil.getControlStackProperties(),
+                                 JGroupsUtil.getDataStackProperties(),
+                                 10000, 10000, pullPolicy, rf, 1, 1000);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+}
+
+
+
+

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -1786,12 +1786,12 @@
         {
             try
             {
-                log.info("(ThreadCloser)Waiting on monitor to close thread");
+                log.trace("(ThreadCloser)Waiting on monitor to close thread");
                 synchronized (waitMonitor)
                 {
                     waitMonitor.wait();
                 }
-                log.info("(ThreadCloser)Notification received");
+                log.trace("(ThreadCloser)Notification received");
                 Thread.sleep(timeToSleep);
                 topicConsumer.close();
 
@@ -1824,12 +1824,12 @@
         {
             try
             {
-                log.info("(ThreadReceiver)Waiting on monitor to close thread");
+                log.trace("(ThreadReceiver)Waiting on monitor to close thread");
                 synchronized(waitMonitor)
                 {
                     waitMonitor.wait();
                 }
-                log.info("(ThreadReceiver)Notification received");
+                log.trace("(ThreadReceiver)Notification received");
                 t1=System.currentTimeMillis();
                 receivedObject=topicConsumer.receive(timeToWait);
                 t2=System.currentTimeMillis();
@@ -1868,7 +1868,7 @@
 
       assertNull(receiver.receivedObject);
 
-      log.info("Elapsed time was " + (receiver.t2-receiver.t1));
+      log.trace("Elapsed time was " + (receiver.t2-receiver.t1));
 
       // We need to make sure the
       assertTrue("Receive was supposed to receive a notification before 2 seconds",receiver.t2-receiver.t1<=1500);

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ManualCrashTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -173,8 +173,6 @@
          
          assertEquals(count, c);
          
-         log.info("Received message:" + count);
-         
          count++;
          
          if (count == i)

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2006-10-13 12:09:35 UTC (rev 1473)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2006-10-16 22:03:03 UTC (rev 1474)
@@ -229,7 +229,6 @@
 
    public void testMessageCountOverFullSize() throws Exception
    {
-      log.info("** starting testMessageCountOverFullSize");
       InitialContext ic = new InitialContext(ServerManagement.getJNDIEnvironment());
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
       Connection conn = null;
@@ -271,13 +270,10 @@
 
          int receivedCount = 0;
 
-         log.info("Starting receiver loop...");
-
          while((cons.receive(2000)) != null)
          {
             receivedCount++;
 
-            log.info(receivedCount + " messages received");
             Thread.sleep(500);
             int mc = ((Integer)ServerManagement.
                getAttribute(destObjectName, "MessageCount")).intValue();
@@ -296,7 +292,6 @@
       }
       finally
       {
-         log.info("** leaving testMessageCountOverFullSize");
          ServerManagement.undeployQueue("QueueMessageCount2");
 
          if (conn != null)




More information about the jboss-cvs-commits mailing list