[jboss-cvs] JBoss Messaging SVN: r1487 - in trunk: docs/clustering/en/modules src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 17 13:51:54 EDT 2006


Author: timfox
Date: 2006-10-17 13:51:41 -0400 (Tue, 17 Oct 2006)
New Revision: 1487

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
Modified:
   trunk/docs/clustering/en/modules/configuration.xml
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-575



Modified: trunk/docs/clustering/en/modules/configuration.xml
===================================================================
--- trunk/docs/clustering/en/modules/configuration.xml	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/docs/clustering/en/modules/configuration.xml	2006-10-17 17:51:41 UTC (rev 1487)
@@ -78,7 +78,6 @@
          <attribute name="GroupName">Topic</attribute>
          <attribute name="StateTimeout">5000</attribute>
          <attribute name="CastTimeout">5000</attribute>
-         <attribute name="PullSize">1</attribute>
          <attribute name="StatsSendPeriod">10000</attribute>
          <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
          <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
@@ -150,13 +149,6 @@
      </para>
    </section>
 
-   <section id="conf.pullsize">
-     <title>PullSize</title>
-     <para>
-        The maximum number of messages to pull from one node to another when the local node is starving. Defaults to 1.
-     </para>
-   </section>
-
    <section id="conf.statssend">
      <title>StatsSendPeriod</title>
      <para>

Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-10-17 17:51:41 UTC (rev 1487)
@@ -75,7 +75,6 @@
       <attribute name="GroupName">Queue</attribute>
       <attribute name="StateTimeout">5000</attribute>
       <attribute name="CastTimeout">5000</attribute>
-      <attribute name="PullSize">1</attribute>
       <attribute name="StatsSendPeriod">10000</attribute>
       <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
       <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
@@ -140,7 +139,6 @@
       <attribute name="GroupName">Topic</attribute>
       <attribute name="StateTimeout">5000</attribute>
       <attribute name="CastTimeout">5000</attribute>
-      <attribute name="PullSize">1</attribute>
       <attribute name="StatsSendPeriod">10000</attribute>
       <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
       <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>

Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-10-17 17:51:41 UTC (rev 1487)
@@ -83,12 +83,6 @@
       <type>long</type>
    </attribute>   
    
-   <attribute access="read-write" getMethod="getPullSize" setMethod="setPullSize">
-      <description>The maximum number of message to pull in one go from a remote queue when the local queue consumers are starving</description>
-      <name>PullSize</name>
-      <type>int</type>
-   </attribute> 
-   
    <attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
       <description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
       <name>StatsSendPeriod</name>

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -69,8 +69,6 @@
      
    private String groupName;
    
-   private int pullSize = 1;   
-   
    private long statsSendPeriod = 1000;
    
    private String clusterRouterFactory;
@@ -172,16 +170,6 @@
       return groupName;
    }
    
-   public void setPullSize(int size)
-   {
-      this.pullSize = size;
-   }
-   
-   public int getPullSize()
-   {
-      return pullSize;
-   }
-   
    public void setStatsSendPeriod(long period)
    {
       this.statsSendPeriod = period;
@@ -256,7 +244,6 @@
                                                syncChannelConfig, asyncChannelConfig,
                                                stateTimeout, castTimeout,
                                                pullPolicy, rf,
-                                               pullSize,
                                                statsSendPeriod);
          
          postOffice.start();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -54,9 +54,9 @@
             request =  new BindRequest();
             break;
          }
-         case PullMessagesResultRequest.TYPE:
+         case PullMessageResultRequest.TYPE:
          {
-            request = new PullMessagesResultRequest();
+            request = new PullMessageResultRequest();
             break;
          }
          case MessageRequest.TYPE:
@@ -94,6 +94,11 @@
             request = new UnbindRequest();
             break;
          }
+         case RollbackPullRequest.TYPE:
+         {
+            request = new RollbackPullRequest();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + type);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -84,6 +84,7 @@
    //Used for failure testing
    private boolean failBeforeCommit;
    private boolean failAfterCommit;
+   private boolean failHandleResult;
      
    private boolean trace = log.isTraceEnabled();
                         
@@ -130,8 +131,6 @@
    
    private ClusterRouterFactory routerFactory;
    
-   private int pullSize;
-   
    private Map routerMap;
    
    private StatsSender statsSender;
@@ -166,12 +165,11 @@
             long stateTimeout, long castTimeout,
             MessagePullPolicy redistributionPolicy,
             ClusterRouterFactory rf,
-            int pullSize,
             long statsSendPeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, pullSize, statsSendPeriod);
+           rf, statsSendPeriod);
       
       this.syncChannelConfigE = syncChannelConfig;      
       this.asyncChannelConfigE = asyncChannelConfig;     
@@ -193,12 +191,11 @@
                               long stateTimeout, long castTimeout,
                               MessagePullPolicy redistributionPolicy,                      
                               ClusterRouterFactory rf,
-                              int pullSize,
                               long statsSendPeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, pullSize, statsSendPeriod);
+           rf, statsSendPeriod);
 
       this.syncChannelConfigS = syncChannelConfig;      
       this.asyncChannelConfigS = asyncChannelConfig;     
@@ -215,7 +212,6 @@
                                long stateTimeout, long castTimeout,                             
                                MessagePullPolicy redistributionPolicy,                               
                                ClusterRouterFactory rf,
-                               int pullSize,
                                long statsSendPeriod)
    {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
@@ -233,8 +229,6 @@
       
       this.routerFactory = rf;
       
-      this.pullSize = pullSize;
-       
       routerMap = new HashMap();
       
       statsSender = new StatsSender(this, statsSendPeriod);
@@ -795,6 +789,27 @@
       if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
    }
    
+   public void rollbackTransaction(TransactionId id) throws Throwable
+   {
+      if (trace) { log.trace(this.nodeId + " rolling back transaction " + id ); }
+      
+      ClusterTransaction tx = null;
+        
+      synchronized (holdingArea)
+      {
+         tx = (ClusterTransaction)holdingArea.remove(id);                
+      }
+      
+      if (tx == null)
+      {
+         throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+      }
+      
+      tx.rollback(this);
+      
+      if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+   }
+   
    /**
     * Check for any transactions that need to be committed or rolled back
     */
@@ -997,43 +1012,61 @@
    } 
   
    
-   public void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable
+   public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
+                                       String queueName, org.jboss.messaging.core.Message message) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " handling pull result " + messages + " for " + queueName); }
+      if (trace) { log.trace(this.nodeId + " handling pull result " + message + " for " + queueName); }
                
       Binding binding = getBindingForQueueName(queueName);
       
-      if (binding == null)
-      {
-         //This might happen if the queue is unbound
-         return;
-      }
-            
-      LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
-           
-      RemoteQueueStub remoteQueue = localQueue.getPullQueue();
+      //The binding might be null if the queue was unbound
       
-      if (remoteNodeId != remoteQueue.getNodeId())
-      {
-         //It might have changed since the request was sent
-         Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
+      boolean handled = false;
+      
+      if (!failHandleResult && binding != null)
+      {                     
+         LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
+              
+         RemoteQueueStub remoteQueue = localQueue.getPullQueue();
          
-         if (bindings != null)
+         if (remoteNodeId != remoteQueue.getNodeId())
          {
-            binding = (Binding)bindings.get(queueName);
+            //It might have changed since the request was sent
+            Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
             
-            if (binding != null)
-            {                     
-              remoteQueue = (RemoteQueueStub)binding.getQueue();                              
+            if (bindings != null)
+            {
+               binding = (Binding)bindings.get(queueName);
+               
+               if (binding != null)
+               {                     
+                 remoteQueue = (RemoteQueueStub)binding.getQueue();                              
+               }
             }
          }
+         
+         if (remoteQueue != null)
+         {
+            localQueue.handlePullMessagesResult(remoteQueue, message, holdingTxId,
+                                                failBeforeCommit, failAfterCommit);
+            
+            handled = true;
+         }     
       }
       
-      if (remoteQueue != null)
+      if (!handled)
       {
-         localQueue.handlePullMessagesResult(remoteQueue, messages, holdingTxId,
-                                             failBeforeCommit, failAfterCommit);
-      }     
+         //If we didn't handle it for what ever reason, then we might have to send a rollback
+         //message to the other node otherwise the transaction might end up in the holding
+         //area for ever
+         if (message.isReliable())
+         {
+            //Only reliable messages will be in holding area
+            this.asyncSendRequest(new RollbackPullRequest(this.nodeId, holdingTxId), remoteNodeId);
+            
+            if (trace) { log.trace(this.nodeId + " send rollback pull request"); }
+         }
+      }      
    }
    
    
@@ -1045,10 +1078,11 @@
    // Public ------------------------------------------------------------------------------------------
       
    //Used for testing only
-   public void setFail(boolean beforeCommit, boolean afterCommit)
+   public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
    {
       this.failBeforeCommit = beforeCommit;
       this.failAfterCommit = afterCommit;
+      this.failHandleResult = handleResult;
    }
    
    //Used for testing only
@@ -1137,7 +1171,6 @@
    {
       if (trace) { log.trace(this.nodeId + " loading bindings"); }
       
-      // TODO I need to know whether this call times out - how do I know this??
       boolean isState = syncChannel.getState(null, stateTimeout);
                               
       if (!isState)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -29,6 +29,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
@@ -178,12 +179,12 @@
       acknowledgeInternal(d, null, false, false);      
    }
    
-   public void handlePullMessagesResult(RemoteQueueStub remoteQueue, List messages,
+   public void handlePullMessagesResult(RemoteQueueStub remoteQueue, Message message,
                                         long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception
    { 
       //This needs to be run on a different thread to the one used by JGroups to deliver the message
       //to avoid deadlock
-      Runnable runnable = new MessagePullResultRunnable(remoteQueue, messages, holdingTxId,
+      Runnable runnable = new MessagePullResultRunnable(remoteQueue, message, holdingTxId,
                                                         failBeforeCommit, failAfterCommit);
       
       executor.execute(runnable);      
@@ -280,12 +281,6 @@
       }
    }  
    
-   /*
-    * Send a message pull request.
-    * 
-    * TODO - do we really need this class?
-    * Why can't we just execute on the same thread?
-    */
    private class SendPullRequestRunnable implements Runnable
    {
       private RemoteQueueStub theQueue;
@@ -325,18 +320,18 @@
     * tx thus avoiding 2PC and maintaining reliability :)
     * We do the following:
     * 
-    * 1. Send a PullMessagesRequest to the remote node, on receipt it will create deliveries for message(s), and 
+    * 1. Send a PullMessagesRequest to the remote node, on receipt it will create a delivery for the message, and 
     * possibly add a holding tx (if there are any persistent messages), the messages will then be returned in
     * a PullMessagesResultRequest which is sent asynchronously from the remote node back to here to avoid
     * distributed deadlock.
     * 2. When the result is returned it hits this method.
-    * 3. The retrieved messages are added to the local queue in the tx
-    * 4. Deliveries corresponding to the messages retrieved are acknowledged LOCALLY for the remote queue.
+    * 3. The retrieved message is added to the local queue in the tx
+    * 4. Delivery corresponding to the message is acknowledged LOCALLY for the remote queue.
     * 5. The local tx is committed.
     * 6. Send "commit" message to remote node
-    * 7. "Commit" message is received and deliveries in the holding transaction are acknowledged IN MEMORY only.
-    * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
-    * depending on whether they exist in the database
+    * 7. "Commit" message is received and delivery in the holding transaction is acknowledged IN MEMORY only.
+    * On failure, commit or rollback will be called on the holding transaction causing the delivery to be acked or cancelled
+    * depending on whether it exists in the database
     * 
     * Recovery is handled in the same way as CastMessagesCallback
     * 
@@ -367,82 +362,57 @@
       {
          try
          {
-            List dels = null;
+            Delivery del = null;
             
-            //We only get the refs if receiversReady = false so as not to steal messages that
+            //We only get the delivery if receiversReady = false so as not to steal messages that
             //might be consumed by local receivers            
             if (!receiversReady)
-            {               
-               int count = 0;
-               
+            {                  
                MessageReference ref;
                
-               dels = new ArrayList();
-               
                synchronized (refLock)
                {
                   synchronized (deliveryLock)
                   {
-                     while (count < number && (ref = removeFirstInMemory()) != null)
+                     ref = removeFirstInMemory();
+                     
+                     if (ref != null)
                      {
-                        SimpleDelivery del = new SimpleDelivery(LocalClusteredQueue.this, ref);
+                        del = new SimpleDelivery(LocalClusteredQueue.this, ref);
                         
                         deliveries.add(del);
-                        
-                        dels.add(del);       
-                        
-                        count++;
-                     }  
+                     }                      
                   }
                }                    
             }
-            else
-            {
-               dels = Collections.EMPTY_LIST;
-            }
             
-            if (trace) { log.trace("PullMessagesRunnable got " + dels.size() + " deliveries"); }
+            if (trace) { log.trace("PullMessagesRunnable got " + del); }
             
-            PullMessagesResultRequest response = new PullMessagesResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(), name, dels.size());
-            
-            List reliableDels = null;
-            
-            if (!dels.isEmpty())
-            {
-               Iterator iter = dels.iterator();
-               
-               Delivery del = (Delivery)iter.next();
-               
-               if (del.getReference().isReliable())
+            if (del != null)
+            {            
+               PullMessageResultRequest response =
+                  new PullMessageResultRequest(LocalClusteredQueue.this.nodeId, txId.getTxId(),
+                                               name,
+                                               del.getReference().getMessage());
+                        
+               if (!del.getReference().isReliable())
                {
-                  //Add it to internal list
-                  if (reliableDels == null)
-                  {
-                     reliableDels = new ArrayList();                                    
-                  }
-                  
-                  reliableDels.add(del);
-               }
-               else
-               {
                   //We can ack it now
                   del.acknowledge(null);
                }
+               else
+               {
+                  //Add this to the holding area
+                  tx.setReliableDelivery(del);
+                  
+                  office.holdTransaction(txId, tx);
+               }            
+                         
+               //We send the messages asynchronously to avoid a deadlock situation which can occur
+               //if we were using MessageDispatcher to get the messages.
                
-               response.addMessage(del.getReference().getMessage());
+               office.asyncSendRequest(response, returnNodeId);   
             }
-                 
-            if (reliableDels != null)
-            {
-               //Add this to the holding area
-               tx.setReliableDels(reliableDels);
-               office.holdTransaction(txId, tx);
-            }
-             
-            //We send the messages asynchronously to avoid a deadlock situation which can occur
-            //if we were using MessageDispatcher to get the messages.
-            
-            office.asyncSendRequest(response, returnNodeId);   
          }
          catch (Throwable e)
          {
@@ -455,7 +425,7 @@
    {
       private RemoteQueueStub remoteQueue;
       
-      private List messages;
+      private Message message;
       
       private long holdingTxId;
       
@@ -464,12 +434,12 @@
       private boolean failAfterCommit;
             
       private MessagePullResultRunnable(RemoteQueueStub remoteQueue,
-                                        List messages, long holdingTxId,
+                                        Message message, long holdingTxId,
                                         boolean failBeforeCommit, boolean failAfterCommit)
       {
          this.remoteQueue = remoteQueue;
          
-         this.messages = messages;
+         this.message = message;
          
          this.holdingTxId = holdingTxId;
          
@@ -481,66 +451,57 @@
       {
          try
          {
-            // TODO we should optimise for the case when only one message is pulled which is basically all
-            //we support now anyway
-            //Also we should optimise for the case when only non persistent messages are pulled
-            //in this case we don't need to create a tx.
+            Transaction tx = null;
             
-            Transaction tx = tr.createTransaction();
+            boolean handleTransactionally = message.isReliable() && isRecoverable();
             
-            Iterator iter = messages.iterator();
+            if (handleTransactionally)
+            {
+               tx = tr.createTransaction();  
+               
+               //It will already have been persisted on the other node
+               //so we need to set the persisted flag here
+               message.setPersisted(true);
+            }
             
-            boolean containsReliable = false;
-            
-            while (iter.hasNext())
+            MessageReference ref = null;
+               
+            try
             {
-               org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+               ref = ms.reference(message);
                
-               if (msg.isReliable())
+               //Should be executed synchronously since we already in the event queue
+               Delivery delRet = handleInternal(null, ref, tx, true, true);
+
+               if (delRet == null || !delRet.isSelectorAccepted())
                {
-                  //It will already have been persisted on the other node
-                  //so we need to set the persisted flag here
-                  msg.setPersisted(true);
-                  
-                  containsReliable = true;
-               }
-                     
-               MessageReference ref = null;
-               
-               try
+                  //This should never happen
+                  throw new IllegalStateException("Queue did not accept reference!");
+               }            
+            }
+            finally
+            {
+               if (ref != null)
                {
-                  ref = ms.reference(msg);
-                  
-                  //Should be executed synchronously since we already in the event queue
-                  Delivery delRet = handleInternal(null, ref, tx, true, true);
-
-                  if (delRet == null || !delRet.isSelectorAccepted())
-                  {
-                     //This should never happen
-                     throw new IllegalStateException("Queue did not accept reference!");
-                  }            
+                  ref.releaseMemoryReference();
                }
-               finally
-               {
-                  if (ref != null)
-                  {
-                     ref.releaseMemoryReference();
-                  }
-               }
-                       
-               //Acknowledge on the remote queue stub
-               Delivery del = new SimpleDelivery(remoteQueue, ref);
-               
-               del.acknowledge(tx);        
             }
+                       
+            //Acknowledge on the remote queue stub
+            Delivery del = new SimpleDelivery(remoteQueue, ref);
             
+            del.acknowledge(tx);        
+                        
             //For testing to simulate failures
             if (failBeforeCommit)
             {
                throw new Exception("Test failure before commit");
             }
                
-            tx.commit();
+            if (handleTransactionally)
+            {
+               tx.commit();
+            }
             
             //For testing to simulate failures
             if (failAfterCommit)
@@ -555,9 +516,7 @@
             //and send a checkrequest
             //This applies to a normal message and messages requests too
                   
-            //We only need to send a commit message if there were reliable messages since otherwise
-            //the transaction wouldn't have been added in the holding area
-            if (containsReliable && isRecoverable())
+            if (handleTransactionally)
             {         
                ClusterRequest req = new PullMessagesRequest(nodeId, holdingTxId);
                
@@ -568,9 +527,6 @@
          {
             log.error("Failed to handle pulled message", e);
          }
-      }
-      
-   }
-       
-   
+      }      
+   }         
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -62,11 +62,13 @@
    
    void commitTransaction(TransactionId id) throws Throwable;
    
+   void rollbackTransaction(TransactionId id) throws Throwable;
+   
    void updateQueueStats(int nodeId, List stats) throws Exception;
    
    void sendQueueStats() throws Exception;
    
    boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
    
-   void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, List messages) throws Throwable;
+   void handleMessagePullResult(int remoteNodeId, long holdingTxId, String queueName, Message message) throws Throwable;
 }

Copied: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java (from rev 1485, trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java	2006-10-17 14:49:19 UTC (rev 1485)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessageResultRequest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.message.MessageFactory;
+
+/**
+ * 
+ * A PullMessageResultRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class PullMessageResultRequest extends ClusterRequest
+{
+   private static final Logger log = Logger.getLogger(PullMessageResultRequest.class);   
+   
+   public static final int TYPE = 2;
+   
+   private long holdingTxId;
+   
+   private String queueName;
+   
+   private Message message;
+   
+   private int remoteNodeId;
+   
+   PullMessageResultRequest()
+   {
+   }
+   
+   PullMessageResultRequest(int remoteNodeId, long holdingTxId, String queueName, Message message)
+   {
+      this.remoteNodeId = remoteNodeId;
+      
+      this.holdingTxId = holdingTxId;
+      
+      this.queueName = queueName;
+      
+      this.message = message;
+   }
+   
+   Message getMessage()
+   {
+      return message;
+   }
+   
+   public void read(DataInputStream in) throws Exception
+   {
+      remoteNodeId = in.readInt();
+      
+      holdingTxId = in.readLong();
+      
+      queueName = in.readUTF();
+      
+      byte type = in.readByte();
+      
+      message = MessageFactory.createMessage(type);
+      
+      message.read(in);  
+   }
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeInt(remoteNodeId);
+      
+      out.writeLong(holdingTxId);
+      
+      out.writeUTF(queueName);
+      
+      out.writeByte(message.getType());
+      
+      message.write(out);     
+   }
+
+   Object execute(PostOfficeInternal office) throws Throwable
+   {
+      office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, message);
+      
+      return null;
+   }
+
+   byte getType()
+   {
+      return TYPE;
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -49,7 +49,7 @@
    
    private int numMessages;
    
-   private List reliableDels;
+   private Delivery reliableDelivery;
    
    static final int TYPE = 5;
          
@@ -101,9 +101,9 @@
    }
    
    //TODO this is a bit messsy - must be a nicer way of setting this
-   void setReliableDels(List reliableDels)
+   void setReliableDelivery(Delivery del)
    {
-      this.reliableDels = reliableDels;
+      this.reliableDelivery = del;
    }
 
    byte getType()
@@ -113,21 +113,15 @@
 
    public boolean check(PostOfficeInternal office) throws Exception
    {
-      // If the messages DON'T exist in the database then we should commit the transaction
-      // Since the acks have already been processed persistently
+      // If the message doesn't exist in the database then we should commit the transaction
+      // Since the ack has already been processed persistently
       
       // otherwise we should roll it back
-      
-      Iterator iter = reliableDels.iterator();
-      
-      //We only need to check one of them since they would all have been acked in a tx      
-      
-      Delivery del = (Delivery)iter.next();
-      
+
       //We store the channelID of one of the channels that the message was persisted in
       //it doesn't matter which one since they were all inserted in the same tx
       
-      if (office.referenceExistsInStorage(checkChannelID, del.getReference().getMessageID()))
+      if (office.referenceExistsInStorage(checkChannelID, reliableDelivery.getReference().getMessageID()))
       {
          //We should rollback
          return false;
@@ -141,34 +135,20 @@
 
    public void commit(PostOfficeInternal office) throws Throwable
    {
-      //We need to ack the deliveries
+      //We need to ack the delivery
       
-      Iterator iter = reliableDels.iterator();
-      
-      while (iter.hasNext())
-      {
-         Delivery del = (Delivery)iter.next();
-         
-         //We need to ack them in memory only
-         //since they would have been acked on the pulling node
-         LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
-              
-         queue.acknowledgeFromCluster(del);
-      }
+      //We need to ack it in memory only
+      //since it would have been acked on the pulling node
+      LocalClusteredQueue queue = (LocalClusteredQueue)reliableDelivery.getObserver();
+           
+      queue.acknowledgeFromCluster(reliableDelivery);      
    }
 
    public void rollback(PostOfficeInternal office) throws Throwable
    {
-      //We need to cancel the deliveries
+      //We need to cancel the delivery
       
-      Iterator iter = reliableDels.iterator();
-      
-      while (iter.hasNext())
-      {
-         Delivery del = (Delivery)iter.next();
-         
-         del.cancel();
-      }      
+      reliableDelivery.cancel();  
    }
    
    public void read(DataInputStream in) throws Exception

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResultRequest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.message.MessageFactory;
-
-/**
- * 
- * A PullMessagesResultRequest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class PullMessagesResultRequest extends ClusterRequest
-{
-   private static final Logger log = Logger.getLogger(PullMessagesResultRequest.class);   
-   
-   public static final int TYPE = 2;
-   
-   private long holdingTxId;
-   
-   private String queueName;
-   
-   private List messages;
-   
-   private int remoteNodeId;
-   
-   PullMessagesResultRequest()
-   {
-   }
-   
-   PullMessagesResultRequest(int remoteNodeId, long holdingTxId, String queueName, int size)
-   {
-      this.remoteNodeId = remoteNodeId;
-      
-      this.holdingTxId = holdingTxId;
-      
-      this.queueName = queueName;
-      
-      messages = new ArrayList(size);
-   }
-   
-   void addMessage(Message msg)
-   {
-      messages.add(msg);
-   }
-   
-   List getMessages()
-   {
-      return messages;
-   }
-   
-   public void read(DataInputStream in) throws Exception
-   {
-      remoteNodeId = in.readInt();
-      
-      holdingTxId = in.readLong();
-      
-      queueName = in.readUTF();
-      
-      int num = in.readInt();
-      
-      messages = new ArrayList(num);
-      
-      for (int i = 0; i < num; i++)
-      {
-         byte type = in.readByte();
-         
-         Message msg = MessageFactory.createMessage(type);
-         
-         msg.read(in);
-         
-         messages.add(msg);
-      }
-   }
-
-   public void write(DataOutputStream out) throws Exception
-   {
-      out.writeInt(remoteNodeId);
-      
-      out.writeLong(holdingTxId);
-      
-      out.writeUTF(queueName);
-      
-      out.writeInt(messages.size());
-      
-      Iterator iter = messages.iterator();
-      
-      while (iter.hasNext())
-      {
-         Message msg = (Message)iter.next();
-         
-         out.writeByte(msg.getType());
-         
-         msg.write(out);
-      }   
-   }
-
-   Object execute(PostOfficeInternal office) throws Throwable
-   {
-      office.handleMessagePullResult(remoteNodeId, holdingTxId, queueName, messages);
-      
-      return null;
-   }
-
-   byte getType()
-   {
-      return TYPE;
-   }
-}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RollbackPullRequest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A RollbackPullRequest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RollbackPullRequest extends ClusterRequest
+{
+   static final int TYPE = 10;
+   
+   private int nodeId;
+   
+   private long txId;
+   
+   public RollbackPullRequest()
+   {      
+   }
+   
+   RollbackPullRequest(int nodeId, long txId)
+   {
+      this.nodeId = nodeId;
+      
+      this.txId = txId;
+   }
+
+   Object execute(PostOfficeInternal office) throws Throwable
+   {
+      office.rollbackTransaction(new TransactionId(nodeId, txId));
+      
+      return null;
+   }
+
+   byte getType()
+   {
+      return TYPE;
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+      nodeId = in.readInt();
+      
+      txId = in.readLong();
+   }
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeInt(nodeId);
+      
+      out.writeLong(txId);
+   }
+
+}

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -2227,7 +2227,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, pullPolicy, rf, 1, 1000);
+                                 5000, 5000, pullPolicy, rf, 1000);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -384,7 +384,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, 1, 1000);
+                                 5000, 5000, redistPolicy, rf, 1000);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -359,7 +359,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, 1, 1000);
+                                 5000, 5000, redistPolicy, rf, 1000);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -116,7 +116,7 @@
          queue3.add(receiver3);
          
          //This will make it fail after casting but before persisting the message in the db
-         office1.setFail(true, false);
+         office1.setFail(true, false, false);
          
          Transaction tx = tr.createTransaction();
          
@@ -251,7 +251,7 @@
          queue3.add(receiver3);
          
          //This will make it fail after casting and persisting the message in the db
-         office1.setFail(false, true);
+         office1.setFail(false, true, false);
          
          Transaction tx = tr.createTransaction();
          
@@ -361,7 +361,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, 1, 1000);
+                                 5000, 5000, redistPolicy, rf, 1000);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2006-10-17 16:41:04 UTC (rev 1486)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2006-10-17 17:51:41 UTC (rev 1487)
@@ -87,7 +87,6 @@
       super.tearDown();
    }
    
-
    public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
    {
       consumeAll(false, false);
@@ -107,9 +106,7 @@
    {
       consumeAll(true, true);
    }
-   
-   
-   
+         
    public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
    {
       consumeBitByBit(false, false);
@@ -130,30 +127,6 @@
       consumeBitByBit(true, true);
    }
    
-   
-   
-   
-//   
-//   public void testConsumeConcurrentlyNonPersistentNonRecoverable() throws Throwable
-//   {
-//      consumeConcurrently(false, false);
-//   }
-//   
-//   public void testConsumeConsumeConcurrentlyPersistentNonRecoverable() throws Throwable
-//   {
-//      consumeConcurrently(true, false);
-//   }
-//   
-//   public void testConsumeConsumeConcurrentlyNonPersistentRecoverable() throws Throwable
-//   {
-//      consumeConcurrently(false, true);
-//   }
-//   
-//   public void testConsumeConsumeConcurrentlyPersistentRecoverable() throws Throwable
-//   {
-//      consumeConcurrently(true, true);
-//   }
-   
    public void testSimpleMessagePull() throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
@@ -301,7 +274,7 @@
          receiver2.setMaxRefs(1);
          
          //Force a failure before commit
-         office2.setFail(true, false);
+         office2.setFail(true, false, false);
          
          log.info("delivering");
          queue2.deliver(false);                 
@@ -401,7 +374,7 @@
          receiver2.setMaxRefs(1);
          
          //Force a failure after commit the ack to storage
-         office2.setFail(false, true);
+         office2.setFail(false, true, false);
          
          log.info("delivering");
          queue2.deliver(false);                 
@@ -444,6 +417,93 @@
       }
    }
    
+   public void testFailHandleMessagePullResult() throws Throwable
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      try
+      {      
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 =
+            office1.bindClusteredQueue("queue1", queue1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 =
+            office2.bindClusteredQueue("queue1", queue2);
+                          
+         Message msg = CoreMessageFactory.createCoreMessage(1);   
+         msg.setReliable(true);
+         
+         MessageReference ref = ms.reference(msg);  
+         
+         office1.route(ref, "queue1", null);
+                  
+         Thread.sleep(2000);
+         
+         //Messages should all be in queue1
+         
+         List msgs = queue1.browse();
+         assertEquals(1, msgs.size());
+         
+         msgs = queue2.browse();
+         assertTrue(msgs.isEmpty());
+         
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+         receiver1.setMaxRefs(0);
+         queue1.add(receiver1);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+         receiver2.setMaxRefs(0);
+         queue2.add(receiver2);
+         
+         //Prompt delivery so the channels know if the receivers are ready
+         queue1.deliver(false);
+         Thread.sleep(2000);
+           
+         //Pull from 1 to 2
+         
+         receiver2.setMaxRefs(1);
+         
+         office2.setFail(false, false, true);
+         
+         log.info("delivering");
+         queue2.deliver(false);                 
+         
+         Thread.sleep(3000);
+         
+         //The delivery should be rolled back
+         
+         assertTrue(office2.getHoldingTransactions().isEmpty());        
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         
+         log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+         log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+         
+         assertEquals(1, queue1.memoryRefCount());
+         assertEquals(0, queue1.memoryDeliveryCount());
+         
+         assertEquals(0, queue2.memoryRefCount());
+         assertEquals(0, queue2.memoryDeliveryCount());                     
+      }
+      finally
+      {
+         if (office1 != null)
+         {           
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {           
+            office2.stop();
+         }
+      }
+   }
+   
    protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
@@ -946,134 +1006,8 @@
             office5.stop();
          }
       }
-   }
+   }      
    
-   protected void consumeConcurrently(boolean persistent, boolean recoverable) throws Throwable
-   {
-      DefaultClusteredPostOffice office1 = null;
-      
-      DefaultClusteredPostOffice office2 = null;
-      
-      DefaultClusteredPostOffice office3 = null;
-      
-      DefaultClusteredPostOffice office4 = null;
-      
-      DefaultClusteredPostOffice office5 = null;
-          
-      try
-      {   
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-         
-         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-         
-         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-                  
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-                  
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-                  
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-                   
-         //Test with no consumers on queue1
-         
-         //Two equal consumers on queue2 and queue3
-                           
-         //Add messages at queue 1
-         
-         final int NUM_MESSAGES = 10000;
-         
-         this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
-         
-         log.info("sent messages");
-         
-         Thread.sleep(4000);
-         
-         ThrottleReceiver receiver1 = new ThrottleReceiver(queue1, 0, 50);
-         queue1.add(receiver1);
-         queue1.deliver(false);  
-         
-         ThrottleReceiver receiver2 = new ThrottleReceiver(queue2, 0, 50);
-         queue2.add(receiver2);
-         queue2.deliver(false);  
-         
-         Thread.sleep(45000);
-         
-         log.info("receiver1: " + receiver1.getTotalCount());
-         
-         log.info("receiver2: " + receiver2.getTotalCount());
-         
-
-         //test1
-         
-         
-         //No consumer on node 1
-         //Very slow consumer on node 2
-         //
-         
-         /*
-          * Test with very fast, infinitely big consumer (i.e. is always ready) on node 1
-          * Fast consumer on node2
-          * Send messages on node 1
-          * Verify all go to node1 consumer
-          * 
-          * Test with very fast, not infinitely big consumer (i.e. is not always ready) on node 1
-          * Fast consumer on node2
-          * Send messages on node 1
-          * Verify most go to node1 consumer, some go to node 2
-          * 
-          * Test with slow consumer on node 1, Fast consumer on node 2
-          * 
-          * Test with no consumer on node 1, consumers on other nodes
-          * 
-          * Things up all the other permutations, then take a guess with error margin of
-          * how many messages should be on each node.
-          */
-         
-         
-         checkNoMessageData();
-      }
-      finally
-      { 
-         if (office1 != null)
-         {
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {            
-            office2.stop();
-         }
-         
-         if (office3 != null)
-         {
-            office3.stop();
-         }
-         
-         if (office4 != null)
-         {            
-            office4.stop();
-         }
-         
-         if (office5 != null)
-         {
-            office5.stop();
-         }
-      }
-   }
-   
    class ThrottleReceiver implements Receiver, Runnable
    {
       long pause;
@@ -1218,7 +1152,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 10000, 10000, pullPolicy, rf, 1, 1000);
+                                 10000, 10000, pullPolicy, rf, 1000);
       
       postOffice.start();      
       




More information about the jboss-cvs-commits mailing list