[jboss-cvs] JBoss Messaging SVN: r1473 - in trunk: src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 13 08:09:50 EDT 2006


Author: timfox
Date: 2006-10-13 08:09:35 -0400 (Fri, 13 Oct 2006)
New Revision: 1473

Removed:
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Modified:
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
   trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
   trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-606 http://jira.jboss.com/jira/browse/JBMESSAGING-575 http://jira.jboss.com/jira/browse/JBMESSAGING-596


Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -821,7 +821,7 @@
       //        <path refid="jboss.jmx.classpath"/> from jms/build.xml dependentmodule.classpath
       //
 
-      //FIXME - Yes this is super-ugly - there must be an easier way of doing it
+      //TODO - Yes this is super-ugly - there must be an easier way of doing it
       //also in LocalTestServer is doing the same thing in a slightly different way
       //this should be combined
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -176,7 +176,7 @@
          if (trace) log.trace("created selector");
       }
 
-      //FIXME - 
+      //TODO - 
       //We really need to get rid of this delivery list - it's only purpose in life is to solve
       //the race condition where acks or cancels can come in before handle has returned - and
       //that can be solved in a simpler way anyway.
@@ -350,7 +350,7 @@
             // it. This is because it may still contain deliveries that may well be acknowledged
             // after the consumer has closed. This is perfectly valid.
 
-            // FIXME - The deliveries should really be stored in the session endpoint, not here
+            // TODO - The deliveries should really be stored in the session endpoint, not here
             // that is their natural place, that would mean we wouldn't have to mess around with
             // keeping deliveries after this is closed.
 

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -51,10 +51,6 @@
  * and deliver to receivers are not executed concurrently but placed on an event
  * queue and executed serially by a single thread.
  * 
- * This prevents lock contention since requests are
- * executed serially, resulting in better scalability and higher throughput at the expense of some
- * latency.
- * 
  * Currently remoting does not support a non blocking API so a full SEDA approach is not possible at this stage.
  * 
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -99,7 +95,7 @@
    protected Object deliveryLock;
 
    protected boolean active = true;
-
+   
    // Constructors --------------------------------------------------
 
    protected ChannelSupport(long channelID, MessageStore ms,
@@ -172,7 +168,7 @@
       }
       else
       {
-         return handleInternal(sender, ref, tx, true, false);
+         return handleInternal(sender, ref, tx, true, false, true);
       }
    }
 
@@ -189,7 +185,13 @@
    public void cancel(Delivery d) throws Throwable
    {
       // TODO We should also consider executing cancels on the event queue
-      cancelInternal(d);
+      synchronized (deliveryLock)
+      {
+         synchronized (refLock)
+         {
+            cancelInternal(d);
+         }
+      }      
    }
 
    // Distributor implementation ------------------------------------
@@ -382,6 +384,7 @@
 
                del.acknowledge(null);
             }
+
          }
       }
    }
@@ -491,6 +494,7 @@
 
    // Public --------------------------------------------------------
 
+   //Only used for testing
    public int memoryRefCount()
    {
       synchronized (refLock)
@@ -499,6 +503,7 @@
       }
    }
 
+   //Only used for testing
    public int memoryDeliveryCount()
    {
       synchronized (deliveryLock)
@@ -515,7 +520,35 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-
+   
+   protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+   {
+      MessageReference ref;
+      
+      if (iter == null)
+      {
+         //We just get the next ref from the head of the queue
+         ref = (MessageReference) messageRefs.peekFirst();
+      }
+      else
+      {
+         // TODO This will not work with paged refs - see http://jira.jboss.com/jira/browse/JBMESSAGING-275
+         // We need to extend it to work with refs from the db
+         
+         //We have an iterator - this means we are iterating through the queue to find a ref that matches
+         if (iter.hasNext())
+         {                        
+            ref = (MessageReference)iter.next();
+         } 
+         else
+         {
+            ref = null;
+         }
+      }
+      
+      return ref;
+   }     
+   
    /*
     * This methods delivers as many messages as possible to the router until no
     * more deliveries are returned. This method should never be called at the
@@ -535,47 +568,16 @@
          
          while (true)
          {           
-            synchronized (refLock)
-            {              
-               if (iter == null)
-               {
-                  ref = (MessageReference) messageRefs.peekFirst();
-               }
-               else
-               {
-                  if (iter.hasNext())
-                  {                        
-                     ref = (MessageReference)iter.next();
-                  } 
-                  else
-                  {
-                     ref = null;
-                  }
-               }
+            synchronized (deliveryLock)
+            {
+               ref = nextReference(iter, handle);               
             }
-
             if (ref != null)
             {
-               // Check if message is expired (we also do this on the client
-               // side)
-               // If so ack it from the channel
+               // Check if message is expired (we also do this on the clientside) If so ack it from the channel
                if (ref.isExpired())
                {
-                  if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
-                  // remove and acknowledge it
-                  if (iter == null)
-                  {
-                     removeFirstInMemory();
-                  }
-                  else
-                  {
-                     iter.remove();
-                  }
-
-                  Delivery delivery = new SimpleDelivery(this, ref, true);
-
-                  acknowledgeInternal(delivery, null, true, false);
+                  expireRef(ref, iter);
                }
                else
                {
@@ -589,33 +591,24 @@
                   if (del == null)
                   {
                      // No receiver, broken receiver or full receiver so we stop delivering; also
-                     // we need to decrement the delivery count, as no real delivery has been
-                     // actually performed
+                     // we need to decrement the delivery count, as no real delivery has been actually performed
 
                      if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
 
                      ref.decrementDeliveryCount();
+                     
                      receiversReady = false;
-                     return;
+                     
+                     break;
                   }
                   else if (!del.isSelectorAccepted())
                   {
                      // No receiver accepted the message because no selectors matched, so we create
                      // an iterator (if we haven't already created it) to iterate through the refs
-                     // in the channel. No delivery was really performed, so we decrement the
-                     // delivery count
+                     // in the channel. No delivery was really performed, so we decrement the delivery count
 
                      ref.decrementDeliveryCount();
 
-                     // TODO Note that this is only a partial solution since if there are messages
-                     // paged to storage it won't try those - i.e. it will only iterate through
-                     // those refs in memory. Dealing with refs in storage is somewhat tricky since
-                     // we can't just load them and iterate through them since we might run out of
-                     // memory, so we will need to load individual refs from storage given the
-                     // selector expressions. Secondly we should also introduce some in memory
-                     // indexes here to prevent having to iterate through all the refs every time.
-                     // Having said all that, having consumers on a queue that don't match many
-                     // messages is an antipattern and should be avoided by the user.
                      if (iter == null)
                      {
                         iter = messageRefs.iterator();
@@ -627,7 +620,7 @@
                      
                      // Receiver accepted the reference
 
-                     // We must synchronize here to cope with another race condition where message
+                     // We must synchronize here to cope with a race condition where message
                      // is cancelled/acked in flight while the following few actions are being
                      // performed. e.g. delivery could be cancelled acked after being removed from
                      // state but before delivery being added (observed).
@@ -635,29 +628,13 @@
                      {
                         if (trace) { log.trace(this + " incrementing delivery count for " + del); }
 
-                        // FIXME - It's actually possible the delivery could be
-                        // cancelled before it reaches
-                        // here, in which case we wouldn't get a delivery but we
-                        // still need to increment the
-                        // delivery count
-                        // All the problems related to these race conditions and
-                        // fiddly edge cases will disappear
-                        // once we do
-                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
-                        // This will make life a lot easier
+                        // FIXME - It's actually possible the delivery could be cancelled before it reaches
+                        // here, in which case we wouldn't get a delivery but we still need to increment the
+                        // delivery count. TODO http://jira.jboss.com/jira/browse/JBMESSAGING-355
 
-                        del.getReference().incrementDeliveryCount();                    
-
                         if (!del.isCancelled())
                         {
-                           if (iter == null)
-                           {
-                              removeFirstInMemory();
-                           }
-                           else
-                           {
-                              iter.remove();                                
-                           }
+                           removeReference(iter);
 
                            // delivered
                            if (!del.isDone())
@@ -677,6 +654,7 @@
             {
                // No more refs in channel or only ones that don't match any selectors
                if (trace) { log.trace(this + " no more refs to deliver "); }
+               
                break;
             }
          }
@@ -688,7 +666,8 @@
    }
 
    protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
-                                     Transaction tx, boolean persist, boolean synchronous)
+                                     Transaction tx, boolean persist, boolean synchronous,
+                                     boolean deliver)
    {
       if (ref == null)
       {
@@ -716,12 +695,9 @@
          if (tx == null)
          {
             // Don't even attempt synchronous delivery for a reliable message
-            // when we have an
-            // non-recoverable state that doesn't accept reliable messages. If
-            // we do, we may get
-            // into the situation where we need to reliably store an active
-            // delivery of a reliable
-            // message, which in these conditions cannot be done.
+            // when we have an non-recoverable state that doesn't accept reliable messages. If
+            // we do, we may get into the situation where we need to reliably store an active
+            // delivery of a reliable message, which in these conditions cannot be done.
 
             if (ref.isReliable() && !acceptReliableMessages)
             {
@@ -738,11 +714,14 @@
                pm.addReference(channelID, ref, null);        
             }
             
-            addReferenceInMemory(ref);
+            synchronized (refLock)
+            {
+               addReferenceInMemory(ref);
+            }
             
             // We only do delivery if there are receivers that haven't said they don't want
             // any more references.
-            if (receiversReady)
+            if (receiversReady && deliver)
             {
                // Prompt delivery
                deliverInternal(true);
@@ -765,7 +744,7 @@
             else
             {
                // add to post commit callback
-               getCallback(tx, synchronous).addRef(ref);
+               getCallback(tx, synchronous, deliver).addRef(ref);
                
                if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
             }
@@ -793,7 +772,8 @@
       return new SimpleDelivery(this, ref, true);
    }
 
-   protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist, boolean synchronous) throws Exception
+   protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist,
+                                      boolean synchronous) throws Exception
    {   
       if (tx == null)
       {
@@ -811,7 +791,7 @@
       }
       else
       {
-         this.getCallback(tx, synchronous).addDelivery(d);
+         this.getCallback(tx, synchronous, false).addDelivery(d);
    
          if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
    
@@ -839,13 +819,13 @@
       return removed;
    }     
 
-   protected InMemoryCallback getCallback(Transaction tx, boolean synchronous)
+   protected InMemoryCallback getCallback(Transaction tx, boolean synchronous, boolean deliver)
    {
       InMemoryCallback callback = (InMemoryCallback) tx.getCallback(this);            
 
       if (callback == null)
       {
-         callback = new InMemoryCallback(synchronous);
+         callback = new InMemoryCallback(synchronous, deliver);
 
          tx.addCallback(callback, this);
       }
@@ -856,19 +836,99 @@
          {
             throw new IllegalStateException("Callback synchronousness status doesn't match");
          }
+         if (callback.isDeliver() != deliver)
+         {
+            throw new IllegalStateException("Callback deliver status doesn't match");
+         }
       }
 
       return callback;
    }
    
-   protected abstract boolean cancelInternal(Delivery del) throws Exception;
+   protected boolean cancelInternal(Delivery del) throws Exception
+   {
+      if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
+
+      boolean removed = deliveries.remove(del);      
+
+      if (!removed)
+      {         
+         // This can happen if the message is cancelled before the result of
+         // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the delivery
+         // In this case we don't want to add the message reference back into
+         // the state since it was never removed in the first place
+
+         if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
+      }
+      else
+      {
+         messageRefs.addFirst(del.getReference(), del.getReference().getPriority());
+         
+         if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }
+      }
+      
+      return removed;
+   }
    
-   protected abstract MessageReference removeFirstInMemory() throws Exception;
+   protected MessageReference removeFirstInMemory() throws Exception
+   {
+      MessageReference result = (MessageReference) messageRefs.removeFirst();
+
+      return (MessageReference) result;
+   }
    
-   protected abstract void addReferenceInMemory(MessageReference ref) throws Exception;     
+   protected void addReferenceInMemory(MessageReference ref) throws Exception
+   {
+      if (ref.isReliable() && !acceptReliableMessages)
+      {
+         throw new IllegalStateException("Reliable reference " + ref +
+                                         " cannot be added to non-recoverable state");
+      }
+
+      messageRefs.addLast(ref, ref.getPriority());
+
+      if (trace){ log.trace(this + " added " + ref + " non-transactionally in memory"); }      
+   }    
    
    // Private -------------------------------------------------------
+   
+   private void expireRef(MessageReference ref, ListIterator iter) throws Exception
+   {
+      if (trace) { log.trace("Message reference: " + ref + " has expired"); }
 
+      // remove and acknowledge it
+      synchronized (refLock)
+      {
+         if (iter == null)
+         {
+            removeFirstInMemory();
+         }
+         else
+         {
+            iter.remove();
+         }
+      }
+
+      Delivery delivery = new SimpleDelivery(this, ref, true);
+
+      acknowledgeInternal(delivery, null, true, false);
+   }
+   
+   private void removeReference(ListIterator iter) throws Exception
+   {
+      synchronized (refLock)
+      {
+         if (iter == null)
+         {
+            removeFirstInMemory();
+         }
+         else
+         {
+            iter.remove();                                
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
    private class InMemoryCallback implements TxCallback, Runnable
@@ -879,23 +939,32 @@
       
       private boolean synchronous;
       
+      private boolean deliver;
+      
       private boolean committing;
 
       private Future result;
 
-      private InMemoryCallback(boolean synchronous)
+      private InMemoryCallback(boolean synchronous, boolean deliver)
       {
          refsToAdd = new ArrayList();
 
          deliveriesToRemove = new ArrayList();
          
          this.synchronous = synchronous;
+         
+         this.deliver = deliver;
       }
       
       private boolean isSynchronous()
       {
          return synchronous;
       }
+      
+      private boolean isDeliver()
+      {
+         return deliver;
+      }
 
       private void addRef(MessageReference ref)
       {
@@ -1045,7 +1114,10 @@
 
             try
             {
-               addReferenceInMemory(ref);
+               synchronized (refLock)
+               {
+                  addReferenceInMemory(ref);
+               }
             }
             catch (Throwable t)
             {
@@ -1079,7 +1151,7 @@
          }
          
          //prompt delivery
-         if (receiversReady)
+         if (deliver && receiversReady)
          {
             deliverInternal(true);
          }
@@ -1178,7 +1250,7 @@
 
       public void run()
       {
-         Delivery d = handleInternal(sender, ref, null, persist, false);
+         Delivery d = handleInternal(sender, ref, null, persist, false, true);
          result.setResult(d);
       }
    }   

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -181,6 +181,7 @@
    
    // Public --------------------------------------------------------
 
+   //Only used in testing
    public int downCacheCount()
    {
       synchronized (refLock)
@@ -189,6 +190,7 @@
       }
    }
 
+   //Only used in testing
    public boolean isPaging()
    {
       synchronized (refLock)
@@ -364,60 +366,37 @@
          }
       }    
    }
-   
+      
    protected boolean cancelInternal(Delivery del) throws Exception
    {
       if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
 
-      boolean removed;
-
-      synchronized (deliveryLock)
+      boolean removed = super.cancelInternal(del);
+      
+      if (removed && paging)
       {
-         removed = deliveries.remove(del);
-      }
-
-      if (!removed)
-      {         
-         // This can happen if the message is cancelled before the result of
-         // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the delivery
-         // In this case we don't want to add the message reference back into
-         // the state since it was never removed in the first place
-
-         if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
-      }
-      else
-      {
-         synchronized (refLock)
+         // if paging and the in memory queue is exactly full we need to evict the end reference to storage to
+         // preserve the number of refs in the queue
+         if (messageRefs.size() == fullSize + 1)
          {
-            messageRefs.addFirst(del.getReference(), del.getReference().getPriority());
-
-            if (paging)
-            {
-               // if paging we need to evict the end reference to storage to
-               // preserve the number of refs in the queue
-
-               MessageReference ref = (MessageReference)messageRefs.removeLast();
+            MessageReference ref = (MessageReference)messageRefs.removeLast();
  
-               addToDownCache(ref, true);
-            }
+            addToDownCache(ref, true);
          }
-
-         if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }
       }
+         
+      if (trace) { log.trace(this + " added " + del.getReference() + " back into state"); }      
       
       return removed;
    }
-   
+      
    protected MessageReference removeFirstInMemory() throws Exception
    {
-      synchronized (refLock)
-      {
-         MessageReference result = (MessageReference) messageRefs.removeFirst();
+      MessageReference result = super.removeFirstInMemory();
 
-         checkLoad();
+      checkLoad();
 
-         return (MessageReference) result;
-      }
+      return result;
    }
    
    private boolean checkLoad() throws Exception
@@ -447,36 +426,30 @@
          return false;
       }
    }
-   
+    
    protected void addReferenceInMemory(MessageReference ref) throws Exception
-   {
-      if (ref.isReliable() && !acceptReliableMessages)
+   {     
+      if (paging)
       {
-         throw new IllegalStateException("Reliable reference " + ref +
-                                         " cannot be added to non-recoverable state");
+         if (ref.isReliable() && !acceptReliableMessages)
+         {
+            throw new IllegalStateException("Reliable reference " + ref +
+                                            " cannot be added to non-recoverable state");
+         }
+         addToDownCache(ref, false);
       }
-
-      synchronized (refLock)
+      else
       {
-         if (paging)
+         super.addReferenceInMemory(ref);
+         
+         if (messageRefs.size() == fullSize)
          {
-            addToDownCache(ref, false);
-         }
-         else
-         {
-            messageRefs.addLast(ref, ref.getPriority());
+            // We are full in memory - go into paging mode
+            if (trace) { log.trace(this + " going into paging mode"); }
 
-            if (trace){ log.trace(this + " added " + ref + " non-transactionally in memory"); }
-
-            if (messageRefs.size() == fullSize)
-            {
-               // We are full in memory - go into paging mode
-               if (trace) { log.trace(this + " going into paging mode"); }
-
-               paging = true;
-            }
+            paging = true;
          }
-      }
+      }      
    }
    
    protected void addToDownCache(MessageReference ref, boolean cancelling) throws Exception
@@ -525,8 +498,7 @@
       if (trace) { log.trace(this + " flushing " + downCache.size() + " refs from downcache"); }
 
       // Non persistent refs won't already be in the db so they need to be inserted
-      // Persistent refs in a recoverable state will already be there so need to
-      // be updated
+      // Persistent refs in a recoverable state will already be there so need to be updated
 
       List toUpdate = new ArrayList();
 
@@ -573,9 +545,7 @@
 
       if (trace) { log.trace(this + " cleared downcache"); }
    }
-   
-   
-   
+        
    // Private ------------------------------------------------------------------------------
    
    private MessageReference addFromRefInfo(ReferenceInfo info, Map refMap)

Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -108,7 +108,7 @@
       return cancelled;
    }
 
-   public synchronized boolean isSelectorAccepted()
+   public boolean isSelectorAccepted()
    {
       return selectorAccepted;
    }

Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -87,7 +87,7 @@
          // try to release the lock as quickly as possible and make a copy of the receivers array
          // to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
          
-         //FIXME - we shouldn't be cloning an ArrayList for the delivery of each message
+         //TODO - we shouldn't be cloning an ArrayList for the delivery of each message
          //on the primary execution path! 
 
          receiversCopy = new ArrayList(receivers.size());

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -2970,7 +2970,7 @@
       
       //Now set the fields from org.joss.jms.message.JBossMessage if appropriate
       
-      //FIXME - We are mixing concerns here
+      //TODO - We are mixing concerns here
       //The basic JDBCPersistencManager should *only* know about core messages - not 
       //JBossMessages - we should subclass JBDCPersistenceManager and the JBossMessage
       //specific code in a subclass

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -191,7 +191,6 @@
          //Only used in testing
          if (failAfterCommit)
          {
-            log.info("Forcing failure after commit");
             throw new TransactionException("Forced failure for testing");
          }
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -900,10 +900,8 @@
                if (q.isActive())
                {                                                      
                   QueueStats stats = q.getStats();
-                                             
-                  //We don't bother sending the stats if there's no significant change in the values
-                  
-                  if (q.changedSignificantly())
+                                              
+                  if (stats != null)
                   {
                      if (statsList == null)
                      {
@@ -988,14 +986,16 @@
                      RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
                      
                      if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+                                    
+                     localQueue.setPullQueue(toQueue);
                      
-                     if (toQueue != null)
+                     if (toQueue != null && localQueue.getRefCount() == 0)
                      {
-                        localQueue.setPullInfo(toQueue, pullSize);
+                        //We now trigger delivery - this may cause a pull event                                                
+                        //We only do this if there are no refs in the local queue
                         
-                        //We now trigger delivery - this may cause a pull event
                         localQueue.deliver(false);
-                                             
+                                                                    
                         if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
                      }
                   } 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -49,13 +49,13 @@
       ClusteredQueue chosenQueue = null;
       
       int maxMessages = 0;
-      
+       
       while (iter.hasNext())
       {
          ClusteredQueue queue = (ClusteredQueue)iter.next();
          
          if (!queue.isLocal())
-         {
+         {  
             QueueStats stats = queue.getStats();
             
             if (stats != null)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -38,21 +38,8 @@
  * 
  * This router always favours the local queue.
  * 
- * If there is no local queue it will round robin between the others.
+ * If there is no local queue, then it will round robin between the non local queues.
  * 
- * In the case of a distributed point to point queue deployed at each node in the cluster
- * there will always be a local queue.
- * 
- * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
- * then sending the message to the local queue is the most efficient policy.
- * 
- * The exception to this if there are no consumers on the local queue.
- * 
- * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
- * only on the number of nodes that it is looked up at.
- * 
- * In this case the round robin routing will kick in
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
@@ -66,7 +53,7 @@
    private boolean trace = log.isTraceEnabled();
       
    //MUST be an arraylist for fast index access
-   private ArrayList queues;
+   private ArrayList nonLocalQueues;
    
    private ClusteredQueue localQueue;
    
@@ -74,12 +61,12 @@
    
    public DefaultRouter()
    {
-      queues = new ArrayList();
+      nonLocalQueues = new ArrayList();
    }
    
    public int size()
    {
-      return queues.size();
+      return nonLocalQueues.size() + (localQueue == null ? 0 : 1);
    }
    
    public ClusteredQueue getLocalQueue()
@@ -99,17 +86,17 @@
          }
          localQueue = queue;
       }
+      else
+      {
+         nonLocalQueues.add(queue); 
+      }
       
-      queues.add(queue); 
-      
-      target = 0;
-      
       return true;
    }
 
    public void clear()
    {
-      queues.clear();
+      nonLocalQueues.clear();
       
       localQueue = null;
       
@@ -118,31 +105,46 @@
 
    public boolean contains(Receiver queue)
    {
-      return queues.contains(queue);
+      return localQueue == queue || nonLocalQueues.contains(queue);
    }
 
    public Iterator iterator()
    {
+      List queues = new ArrayList();
+      
+      if (localQueue != null)
+      {
+         queues.add(localQueue);
+      }
+      
+      queues.addAll(nonLocalQueues);
+      
       return queues.iterator();
    }
 
    public boolean remove(Receiver queue)
    {      
-      if (queues.remove(queue))
+      if (localQueue == queue)
       {
-         if (localQueue == queue)
-         {
-            localQueue = null;
-         }
+         localQueue = null;
          
-         target = 0;
-         
          return true;
       }
       else
       {
-         return false;
-      }
+         if (nonLocalQueues.remove(queue))
+         {
+            if (target >= nonLocalQueues.size() - 1)
+            {
+               target = nonLocalQueues.size() - 1;
+            }
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }      
    }
 
    public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
@@ -150,8 +152,8 @@
       if (trace) { log.trace(this + " routing ref " + reference); }
       
       //Favour the local queue
-           
-      if (localQueue != null && localQueue.numberOfReceivers() > 0)
+          
+      if (localQueue != null)
       {
          //The only time the local queue won't accept is if the selector doesn't
          //match - in which case it won't match at any other nodes too so no point
@@ -165,21 +167,15 @@
       }
       else
       {
-         //There is no local shared queue or the local queue has no consumers
-          
+         //There is no local shared queue 
          //We round robin among the rest
-         if ((localQueue == null && !queues.isEmpty()) || (localQueue != null && queues.size() > 1))
+         
+         if (!nonLocalQueues.isEmpty())
          {
-            ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+            ClusteredQueue queue = (ClusteredQueue)nonLocalQueues.get(target);
             
-            if (queue == localQueue)
-            {
-               //We don't want to choose the local queue
-               incTarget();
-            }
+            queue = (ClusteredQueue)nonLocalQueues.get(target);
             
-            queue = (ClusteredQueue)queues.get(target);
-            
             Delivery del = queue.handle(observer, reference, tx);
              
             if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
@@ -200,7 +196,7 @@
    {
       target++;
       
-      if (target == queues.size())
+      if (target == nonLocalQueues.size())
       {
          target = 0;
       }
@@ -208,11 +204,13 @@
    
    public List getQueues()
    {
-      return queues;
+      return nonLocalQueues;
    }
 
    public int numberOfReceivers()
    {
-      return queues.size();
+      return nonLocalQueues.size();
    }
 }
+
+

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -25,6 +25,7 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -36,7 +37,9 @@
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.messaging.core.tx.TransactionRepository;
+import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.StreamUtils;
 
@@ -62,19 +65,12 @@
    
    private volatile int lastCount;
    
-   private volatile boolean changedSignificantly;
+   private volatile RemoteQueueStub pullQueue;
    
-   private RemoteQueueStub pullQueue;
-   
    private int nodeId;
    
-   //TODO Make configurable
-   private int pullSize;
-   
    private TransactionRepository tr;
    
-   private Object pullLock = new Object();
- 
    //TODO - we shouldn't have to specify office AND nodeId
    public LocalClusteredQueue(PostOffice office, int nodeId, String name, long id, MessageStore ms, PersistenceManager pm,             
                               boolean acceptReliableMessages, boolean recoverable, QueuedExecutor executor,
@@ -87,7 +83,7 @@
       
       this.tr = tr;
       
-      //FIXME - this cast is a hack
+      //TODO - This cast is potentially unsafe - handle better
       this.office = (PostOfficeInternal)office;
    }
    
@@ -101,44 +97,39 @@
       
       this.tr = tr;
       
-      //FIXME - this cast is a hack
+      //TODO - This cast is potentially unsafe - handle better
       this.office = (PostOfficeInternal)office;
    }
    
-   public void setPullInfo(RemoteQueueStub queue, int pullSize)
+   public void setPullQueue(RemoteQueueStub queue)
    {
-      synchronized (pullLock)
-      {
-         this.pullQueue = queue;
-         
-         this.pullSize = pullSize;
-      }
+      this.pullQueue = queue;
    }
-   
+      
    public QueueStats getStats()
    {      
-      int cnt = messageCount();
+      //Currently we only return the current message reference count for the channel
+      //Note we are only interested in the number of refs in the main queue, not
+      //in any deliveries
+      //Also we are only interested in the value obtained after delivery is complete.
+      //This is so we don't end up with transient values since delivery is half way through
       
+      int cnt = getRefCount();
+      
       if (cnt != lastCount)
       {
-         changedSignificantly = true;
+         lastCount = cnt;
          
-         lastCount = cnt;
+         //We only return stats if it has changed since last time - this is so when we only
+         //broadcast data when necessary
+         return new QueueStats(name, cnt);
       }
       else
       {
-         changedSignificantly = false;
-      }
-      
-      return new QueueStats(name, cnt);
+         return null;
+      } 
    }      
-   
-   //Have the stats changed significantly since the last time we request them?
-   public boolean changedSignificantly()
-   {
-      return changedSignificantly;
-   }
-   
+    
    public boolean isLocal()
    {
       return true;
@@ -148,44 +139,17 @@
    {
       return nodeId;
    }
-      
+   
    /*
     * Used when pulling messages from a remote queue
     */
    public List getDeliveries(int number) throws Exception
    {
-      List dels = new ArrayList();
+      Future result = new Future();
       
-      synchronized (refLock)
-      {
-         synchronized (deliveryLock)
-         {
-            //We only get the refs if receiversReady = false so as not to steal messages that
-            //might be consumed by local receivers            
-            if (!receiversReady)
-            {               
-               int count = 0;
-               
-               MessageReference ref;
-               
-               while (count < number && (ref = removeFirstInMemory()) != null)
-               {
-                  SimpleDelivery del = new SimpleDelivery(this, ref);
-                  
-                  deliveries.add(del);
-                  
-                  dels.add(del);       
-                  
-                  count++;
-               }           
-               return dels;
-            }
-            else
-            {
-               return Collections.EMPTY_LIST;
-            }
-         }
-      }          
+      this.executor.execute(new GetDeliveriesRunnable(result, 1));
+            
+      return (List)result.getResult();
    }
    
    /*
@@ -225,39 +189,25 @@
       acknowledgeInternal(d, null, false, false);      
    }
    
-   protected void deliverInternal(boolean handle) throws Throwable
-   {            
-      int beforeSize = -1;
+   
+   protected MessageReference nextReference(ListIterator iter, boolean handle) throws Throwable
+   {
+      MessageReference ref = super.nextReference(iter, handle);
       
-      if (!handle)
+      if (ref == null)
       {
-         beforeSize  = messageRefs.size();
-      }      
-      
-      super.deliverInternal(handle);
-
-      if (!handle)
-      {
-         int afterSize = messageRefs.size();
+         //There are no available refs in the local queue
+         //Maybe we need to pull one (some) from a remote queue?
          
-         if (trace)
+         if (pullMessages())
          {
-            log.trace(this + " Deciding whether to pull messages. " +
-                     "receiversready:" + receiversReady + " before size:" + beforeSize + " afterSize: " + afterSize);
+            ref = super.nextReference(iter, handle);
          }
-         
-         if (receiversReady && beforeSize == 0 && afterSize == 0)
-         {
-            //Delivery has been prompted (not from handle call)
-            //and has run, and there are consumers that are still interested in receiving more
-            //refs but there are none available in the channel (either the channel is empty
-            //or there are only refs that don't match any selectors)
-            //then we should perhaps pull some messages from a remote queue
-            pullMessages();
-         }
       }
+      
+      return ref;
    }
-   
+
    public boolean isClustered()
    {
       return true;
@@ -286,28 +236,28 @@
     * On failure, commit or rollback will be called on the holding transaction causing the deliveries to be acked or cancelled
     * depending on whether they exist in the database
     * 
+    * Recovery is handled in the same way as CastMessagesCallback
+    * 
     * This method will always be executed on the channel's event queue (via the deliver method)
     * so no need to do any handles or acks inside another event message
     */
-   private void pullMessages() throws Throwable
-   {       
-      RemoteQueueStub theQueue;
-      int thePullSize;
-      
-      synchronized (pullLock)
+   private boolean pullMessages() throws Throwable
+   {      
+      if (pullQueue == null)
       {
-         if (pullQueue == null)
-         {
-            return;
-         }
-         theQueue = pullQueue;
-         thePullSize = pullSize;
+         return false;
       }
-                
+      
+      //TODO we can optimise this for the case when only one message is pulled
+      //and when only non persistent messages are pulled - i.e. we don't need
+      //to create a transaction.
+      
+      RemoteQueueStub theQueue = pullQueue;
+         
       Transaction tx = tr.createTransaction();
          
       ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
-                                                   name, thePullSize);
+                                                   name, 1);
       
       if (trace)
       {
@@ -315,12 +265,14 @@
                    " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
       }
       
+      log.info("==================== Executing pull messages request");
       byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
+      log.info("==================== Executed pull messages request");
       
       if (bytes == null)
       {
          //Ok - node might have left the group
-         return;
+         return false;
       }
       
       PullMessagesResponse response = new PullMessagesResponse();
@@ -346,20 +298,24 @@
             
             containsReliable = true;
          }
-         
+               
          MessageReference ref = null;
          
          try
          {
             ref = ms.reference(msg);
             
-            Delivery delRet = handleInternal(null, ref, tx, true, true);
+            //It's ok to call this directly since this method is only ever called by the delivery thread
+            //We call it with the deliver parameter set to false - this prevents delivery being done
+            //after the ref is added - if delivery was done we would end up in recursion.
+            Delivery delRet = handleInternal(null, ref, tx, true, true, false);
             
             if (delRet == null || !delRet.isSelectorAccepted())
             {
                //This should never happen
-               throw new IllegalStateException("Aaarrgg queue did not accept reference");
+               throw new IllegalStateException("Queue did not accept reference!");
             }
+            
          }
          finally
          {
@@ -391,7 +347,165 @@
          req = new PullMessagesRequest(this.nodeId, tx.getId());
          
          office.asyncSendRequest(req, theQueue.getNodeId());
+      }      
+      
+      return !msgs.isEmpty();
+   }
+   
+   public int getRefCount()
+   {
+      //We are only interested in getting the reference count when delivery is not in progress
+      //since we don't want mid delivery transient spurious values, so we execute the request
+      //on the same thread.
+      
+      Future result = new Future();
+      
+      try
+      {
+         this.executor.execute(new GetRefCountRunnable(result));
       }
+      catch (InterruptedException e)
+      {
+         log.warn("Thread interrupted", e);
+      }
+
+      return ((Integer)result.getResult()).intValue();
+   }
+   
+   private class GetRefCountRunnable implements Runnable
+   {
+      Future result;
       
+      public GetRefCountRunnable(Future result)
+      {
+         this.result = result;
+      }
+      
+      public void run()
+      {
+         int refCount = messageRefs.size();
+         
+         result.setResult(new Integer(refCount));        
+      }
+   }  
+   
+   private class GetDeliveriesRunnable implements Runnable
+   {
+      Future result;
+      
+      int number;
+      
+      public GetDeliveriesRunnable(Future result, int number)
+      {
+         this.result = result;
+         
+         this.number = number;
+      }
+      
+      public void run()
+      {
+         try
+         {
+            List list = null;
+            
+            //We only get the refs if receiversReady = false so as not to steal messages that
+            //might be consumed by local receivers            
+            if (!receiversReady)
+            {               
+               int count = 0;
+               
+               MessageReference ref;
+               
+               list = new ArrayList();
+               
+               synchronized (refLock)
+               {
+                  synchronized (deliveryLock)
+                  {
+                     while (count < number && (ref = removeFirstInMemory()) != null)
+                     {
+                        SimpleDelivery del = new SimpleDelivery(LocalClusteredQueue.this, ref);
+                        
+                        deliveries.add(del);
+                        
+                        list.add(del);       
+                        
+                        count++;
+                     }  
+                  }
+               }                    
+            }
+            else
+            {
+               list = Collections.EMPTY_LIST;
+            }
+            
+            result.setResult(list);
+         }
+         catch (Exception e)
+         {
+            result.setException(e);
+         }                     
+      }
+   } 
+   
+   private class AddReferencesCallback implements TxCallback
+   {
+      private List references;
+      
+      private AddReferencesCallback(List references)
+      {
+         this.references = references;
+      }
+
+      public void afterCommit(boolean onePhase) throws Exception
+      {
+         Iterator iter = references.iterator();
+
+         while (iter.hasNext())
+         {
+            MessageReference ref = (MessageReference) iter.next();
+
+            if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
+
+            try
+            {
+               synchronized (refLock)
+               {
+                  addReferenceInMemory(ref);
+               }
+            }
+            catch (Throwable t)
+            {
+               throw new TransactionException("Failed to add reference", t);
+            }
+         }
+      }
+
+      public void afterPrepare() throws Exception
+      {
+         //NOOP
+      }
+
+      public void afterRollback(boolean onePhase) throws Exception
+      {
+         //NOOP
+      }
+
+      public void beforeCommit(boolean onePhase) throws Exception
+      {
+         //NOOP
+      }
+
+      public void beforePrepare() throws Exception
+      {
+         //NOOP
+      }
+
+      public void beforeRollback(boolean onePhase) throws Exception
+      {
+         //NOOP
+      }
+      
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -69,6 +69,4 @@
    void sendQueueStats() throws Exception;
    
    boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
-   
-   List getDeliveries(String queueName, int numMessages) throws Exception; 
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -29,6 +29,7 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.util.StreamUtils;
 
 /**
@@ -80,8 +81,17 @@
       
       if (hold)
       {                           
-         List dels = office.getDeliveries(queueName, numMessages);
+         Binding binding = office.getBindingForQueueName(queueName);
          
+         if (binding == null)
+         {
+            throw new IllegalStateException("Cannot find binding for queue: " + queueName);
+         }
+         
+         LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
+         
+         List dels = queue.getDeliveries(numMessages);
+         
          if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
          
          PullMessagesResponse response = new PullMessagesResponse(dels.size());
@@ -97,7 +107,7 @@
                //Add it to internal list
                if (reliableDels == null)
                {
-                  reliableDels  = new ArrayList();                                    
+                  reliableDels = new ArrayList();                                    
                }
                
                reliableDels.add(del);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -98,6 +98,7 @@
    class SendStatsTimerTask extends TimerTask
    {
       private boolean stopping;
+      
       private boolean stopped;
       
       private Object stopLock = new Object();
@@ -125,8 +126,11 @@
             if (stopping)
             {
                cancel();
+               
                stopped = true;
+               
                stopLock.notify();
+               
                return;
             }
          }

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -61,6 +61,7 @@
    public static final String BROKEN = "BROKEN";
    public static final String REJECTING = "REJECTING";
    public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
+   public static final String ACCEPTING_TO_MAX = "ACCEPTING_TO_MAX";
 
    private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
 
@@ -77,6 +78,7 @@
    private int invocationsToFutureStateCount;
    private Map waitingArea;
    private boolean immediateAsynchronousAcknowledgment;
+   private int maxRefs;
 
    // Constructors --------------------------------------------------
 
@@ -102,6 +104,7 @@
    {
       this(name, state, null);
    }
+   
 
    public SimpleReceiver(String name, String state, Channel channel)
    {
@@ -142,8 +145,16 @@
             log.trace(this + " is rejecting reference " + ref);
             return null;
          }
+         
+         if (ACCEPTING_TO_MAX.equals(state))
+         {
+            //Only accept up to maxRefs references
+            if (messages.size() == maxRefs)
+            {
+               return null;
+            }
+         }
 
-
          if (BROKEN.equals(state))
          {
             throw new RuntimeException("THIS IS AN EXCEPTION THAT SIMULATES "+
@@ -192,6 +203,11 @@
    }
    
    // Public --------------------------------------------------------
+   
+   public void setMaxRefs(int max)
+   {
+      this.maxRefs = max;
+   }
 
    public void setImmediateAsynchronousAcknowledgment(boolean b)
    {
@@ -392,7 +408,8 @@
           !ACCEPTING.equals(state) &&
           !BROKEN.equals(state) &&
           !REJECTING.equals(state) &&
-          !SELECTOR_REJECTING.equals(state))
+          !SELECTOR_REJECTING.equals(state) &&
+          !ACCEPTING_TO_MAX.equals(state))
       {
          throw new IllegalArgumentException("Unknown receiver state: " + state);
       }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -95,18 +95,6 @@
       local(false);
    }
    
-   
-   public void testLocalNonConsumersPersistent() throws Throwable
-   {
-      localNoConsumers(true);
-   }
-   
-   public void testLocalNoConsumersNonPersistent() throws Throwable
-   {
-      localNoConsumers(false);
-   }
-   
-   
    protected void notLocal(boolean persistent) throws Throwable
    {
       ClusteredPostOffice office1 = null;
@@ -245,150 +233,9 @@
       }
    }
    
-   //if the local queue has no consumers then we treat as if there was no local queue
-   protected void localNoConsumers(boolean persistent) throws Throwable
-   {
-      ClusteredPostOffice office1 = null;
-      
-      ClusteredPostOffice office2 = null;
-      
-      ClusteredPostOffice office3 = null;
-      
-      ClusteredPostOffice office4 = null;
-      
-      ClusteredPostOffice office5 = null;
-      
-      ClusteredPostOffice office6 = null;
-          
-      try
-      {   
-         office1 = createClusteredPostOffice(1, "testgroup");
-         
-         office2 = createClusteredPostOffice(2, "testgroup");
-         
-         office3 = createClusteredPostOffice(3, "testgroup");
-         
-         office4 = createClusteredPostOffice(4, "testgroup");
-         
-         office5 = createClusteredPostOffice(5, "testgroup");
-         
-         office6 = createClusteredPostOffice(6, "testgroup");
-         
-         LocalClusteredQueue queueLocal = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding bindingLocal = office1.bindClusteredQueue("topic", queueLocal);
-         //No consumer on the local one
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
-         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue1.add(receiver1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
-         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue2.add(receiver2);
-         
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
-         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue3.add(receiver3);
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
-         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue4.add(receiver4);
-         
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
-         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue5.add(receiver5);
-               
-         List msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkContainsAndAcknowledge(msgs, receiver3, queue1);                           
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkContainsAndAcknowledge(msgs, receiver4, queue1);                                    
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkContainsAndAcknowledge(msgs, receiver5, queue1); 
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-                     
-      }
-      finally
-      {
-         if (office1 != null)
-         {            
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {
-            office2.stop();
-         }
-         
-         if (office3 != null)
-         {            
-            office3.stop();
-         }
-         
-         if (office4 != null)
-         {
-            office4.stop();
-         }
-         
-         if (office5 != null)
-         {            
-            office5.stop();
-         }
-         
-         if (office6 != null)
-         {
-            office6.stop();
-         }
-      }
-   }
    
    
+   
    protected void local(boolean persistent) throws Throwable
    {
       ClusteredPostOffice office1 = null;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -21,6 +21,7 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -30,6 +31,7 @@
 import org.jboss.messaging.core.FilterFactory;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
@@ -85,8 +87,8 @@
       super.tearDown();
    }
    
-   // The router only has a local queue with a consumer
-   public void testRouterOnlyLocalWithConsumer() throws Exception
+   // The router only has a local queue
+   public void testRouterOnlyLocal() throws Exception
    {
       DefaultRouter dr = new DefaultRouter();
                     
@@ -105,25 +107,6 @@
       sendAndCheck(dr, receiver1);
    }
    
-   //The router only has a local queue with no consumer
-   public void testRouterOnlyLocalNoConsumer() throws Exception
-   {
-      DefaultRouter dr = new DefaultRouter();
-        
-      ClusteredQueue queue = new SimpleQueue(true);
-               
-      dr.add(queue);
-      
-      Message msg = CoreMessageFactory.createCoreMessage(0, false, null);      
-      
-      MessageReference ref = ms.reference(msg);         
-      
-      Delivery del = dr.handle(null, ref, null);
-      
-      assertNull(del);             
-
-   }
-   
    //The router has only one non local queues
    public void testRouterOnlyOneNonLocal() throws Exception
    {
@@ -189,8 +172,8 @@
    }
    
    
-   // The router has one local with consumer and one non local queue
-   public void testRouterOneLocalWithConsumerOneNonLocal() throws Exception
+   // The router has one local with consumer and one non local queue with consumer
+   public void testRouterOneLocalOneNonLocal() throws Exception
    {
       DefaultRouter dr = new DefaultRouter();
                              
@@ -217,8 +200,8 @@
       sendAndCheck(dr, receiver2);                  
    }
    
-   // The router has multiple non local queues and one local queue with consumer
-   public void testRouterMultipleNonLocalOneLocalNoConsumer() throws Exception
+   // The router has multiple non local queues with consumers and one local queue
+   public void testRouterMultipleNonLocalOneLocal() throws Exception
    {
       DefaultRouter dr = new DefaultRouter();            
                   
@@ -265,85 +248,37 @@
       sendAndCheck(dr, receiver4);
    }
    
-   // The router has multiple non local queues and one local queue without consumer
-   public void testRouterMultipleNonLocalOneLocalWithConsumer() throws Exception
+   private long nextId;
+   
+   private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
    {
-      DefaultRouter dr = new DefaultRouter();
-                  
-      ClusteredQueue remote1 = new SimpleQueue(false);
+      Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);      
       
-      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      MessageReference ref = ms.reference(msg);         
       
-      remote1.add(receiver1);
+      Delivery del = router.handle(null, ref, null);
       
-      dr.add(remote1);
+      assertNotNull(del);
       
+      assertTrue(del.isSelectorAccepted());
+            
+      Thread.sleep(250);
       
-      ClusteredQueue remote2 = new SimpleQueue(false);
+      List msgs = receiver.getMessages();
       
-      SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      assertNotNull(msgs);
       
-      remote2.add(receiver2);
+      assertEquals(1, msgs.size());
       
-      dr.add(remote2);
+      Message msgRec = (Message)msgs.get(0);
       
+      assertTrue(msg == msgRec);  
       
-      ClusteredQueue remote3 = new SimpleQueue(false);
-      
-      SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-      
-      remote3.add(receiver3);
-      
-      dr.add(remote3);
-      
-      
-      ClusteredQueue queue = new SimpleQueue(true);
-      
-      
-      dr.add(queue);
-      
-      
-      sendAndCheck(dr, receiver1);
-      
-      sendAndCheck(dr, receiver2);
-      
-      sendAndCheck(dr, receiver3);
-      
-      sendAndCheck(dr, receiver1);
-      
-      sendAndCheck(dr, receiver2);
-      
-      sendAndCheck(dr, receiver3);
+      receiver.clear();
    }
    
-   // The router has one local without consumer and one non local queue
-   public void testRouterMultipleOneLocalWithoutConsumerOneNonLocal() throws Exception
+   private void sendAndCheck(ClusterRouter router, Queue queue) throws Throwable
    {
-      DefaultRouter dr = new DefaultRouter();
-                             
-      ClusteredQueue remote1 = new SimpleQueue(false);
-     
-      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-      
-      remote1.add(receiver1);
-      
-      dr.add(remote1);
-      
-      ClusteredQueue queue = new SimpleQueue(true);
-             
-      dr.add(queue);
-
-      sendAndCheck(dr, receiver1);
-      
-      sendAndCheck(dr, receiver1);
-      
-      sendAndCheck(dr, receiver1);                       
-   }
-   
-   private long nextId;
-   
-   private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
-   {
       Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);      
       
       MessageReference ref = ms.reference(msg);         
@@ -356,7 +291,7 @@
             
       Thread.sleep(250);
       
-      List msgs = receiver.getMessages();
+      List msgs = queue.browse();
       
       assertNotNull(msgs);
       
@@ -366,7 +301,7 @@
       
       assertTrue(msg == msgRec);  
       
-      receiver.clear();
+      queue.removeAllReferences();
    }
    
    
@@ -402,6 +337,8 @@
       private boolean local;
       
       private Receiver receiver;
+      
+      private List refs = new ArrayList();
         
       SimpleQueue(boolean local)
       {
@@ -457,8 +394,18 @@
 
       public List browse()
       {
-         // TODO Auto-generated method stub
-         return null;
+         List msgs = new ArrayList();
+         
+         Iterator iter = refs.iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageReference ref = (MessageReference)iter.next();
+            
+            msgs.add(ref);
+         }
+         
+         return msgs;
       }
 
       public List browse(Filter filter)
@@ -549,12 +496,21 @@
       {
          if (receiver != null)
          {
+            //Send to receiver
+            
             Delivery del = receiver.handle(observer, reference, tx);
             
             return del;
          }
+         else
+         {
+            //Store internally
+            refs.add(reference);
+            
+            return new SimpleDelivery(observer, reference);
+         }
          
-         return new SimpleDelivery(observer, reference);
+      
       }
 
       public void acknowledge(Delivery d, Transaction tx) throws Throwable

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -86,12 +86,16 @@
       
       DefaultClusteredPostOffice office2 = null;
       
+      DefaultClusteredPostOffice office3 = null;
+      
       try
       {      
          office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
          
          office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
          
+         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+         
          LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
@@ -100,10 +104,16 @@
          Binding binding2 =
             office2.bindClusteredQueue("topic1", queue2);
          
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 =
+            office3.bindClusteredQueue("topic1", queue3);
+         
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
          
          //This will make it fail after casting but before persisting the message in the db
          office1.setFail(true, false);
@@ -130,6 +140,9 @@
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
          
+         msgs = receiver3.getMessages();
+         assertTrue(msgs.isEmpty());
+         
          try
          {
             //An exception should be thrown            
@@ -149,7 +162,16 @@
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
          
-         //We now kill the office - this should make the other office do it's transaction check
+         msgs = receiver3.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         assertEquals(1, office1.getHoldingTransactions().size());
+         
+         assertEquals(1, office2.getHoldingTransactions().size());
+         
+         assertEquals(1, office3.getHoldingTransactions().size());
+         
+         //We now kill the office - this should make the other offices do their transaction check
          office1.stop();
          
          Thread.sleep(1000);
@@ -158,12 +180,17 @@
          
          assertTrue(office2.getHoldingTransactions().isEmpty());
          
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         
          //The tx should be removed from the holding area and nothing should be received
          //remember node1 has now crashed so no point checking receiver1
          
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
          
+         msgs = receiver3.getMessages();
+         assertTrue(msgs.isEmpty());
+         
       }
       finally
       {
@@ -176,6 +203,11 @@
          {           
             office2.stop();
          }
+         
+         if (office3!= null)
+         {           
+            office3.stop();
+         }
       }
    }
    
@@ -185,12 +217,16 @@
       
       DefaultClusteredPostOffice office2 = null;
       
+      DefaultClusteredPostOffice office3 = null;
+      
       try
       {      
          office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
          
          office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
          
+         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
+         
          LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
@@ -199,10 +235,16 @@
          Binding binding2 =
             office2.bindClusteredQueue("topic1", queue2);
          
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 =
+            office3.bindClusteredQueue("topic1", queue3);
+         
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
          
          //This will make it fail after casting and persisting the message in the db
          office1.setFail(false, true);
@@ -248,6 +290,15 @@
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
          
+         msgs = receiver3.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         assertEquals(1, office1.getHoldingTransactions().size());
+         
+         assertEquals(1, office2.getHoldingTransactions().size());
+         
+         assertEquals(1, office3.getHoldingTransactions().size());
+         
          //We now kill the office - this should make the other office do it's transaction check
          office1.stop();
          
@@ -257,12 +308,17 @@
          
          assertTrue(office2.getHoldingTransactions().isEmpty());
          
+         assertTrue(office3.getHoldingTransactions().isEmpty());
+         
          //The tx should be removed from the holding area and messages be received
          //no point checking receiver1 since node1 has crashed
          
          msgs = receiver2.getMessages();
          assertEquals(NUM_MESSAGES, msgs.size());
          
+         msgs = receiver3.getMessages();
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
       }
       finally
       {

Deleted: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -1,422 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
-import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.test.messaging.core.SimpleFilterFactory;
-import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-
-public class RedistributionTest extends ClusteringTestBase
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public RedistributionTest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-
-   public void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   public void tearDown() throws Exception
-   {      
-      super.tearDown();
-   }
-   
-   public void testRedistNonPersistentNonRecoverable() throws Throwable
-   {
-      redistTest(false, false);
-   }
-   
-   public void testRedistPersistentNonRecoverable() throws Throwable
-   {
-      redistTest(true, false);
-   }
-   
-   public void testRedistNonPersistentRecoverable() throws Throwable
-   {
-      redistTest(false, true);
-   }
-   
-   public void testRedistPersistentRecoverable() throws Throwable
-   {
-      redistTest(true, true);
-   }
-   
-   public void redistTest(boolean persistent, boolean recoverable) throws Throwable
-   {
-      ClusteredPostOffice office1 = null;
-      
-      ClusteredPostOffice office2 = null;
-      
-      ClusteredPostOffice office3 = null;
-      
-      ClusteredPostOffice office4 = null;
-      
-      ClusteredPostOffice office5 = null;
-          
-      try
-      {   
-         office1 = createClusteredPostOffice(1, "testgroup");
-         
-         office2 = createClusteredPostOffice(2, "testgroup");
-         
-         office3 = createClusteredPostOffice(3, "testgroup");
-         
-         office4 = createClusteredPostOffice(4, "testgroup");
-         
-         office5 = createClusteredPostOffice(5, "testgroup");
-         
-         log.info("Started offices");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
-                  
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
-                  
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
-                  
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
-                  
-         log.info("bound queues");
-         
-         //Send 30 messages to each queue
-         this.sendMessages("queue1", persistent, office1, 30, null);
-         this.sendMessages("queue1", persistent, office2, 30, null);
-         this.sendMessages("queue1", persistent, office3, 30, null);
-         this.sendMessages("queue1", persistent, office4, 30, null);
-         this.sendMessages("queue1", persistent, office5, 30, null);
-                 
-         log.info("sent messages");
-         
-         Thread.sleep(1000);
-         
-         //Check the sizes
-          
-         assertEquals(30, queue1.memoryRefCount());
-         assertEquals(0, queue1.memoryDeliveryCount());
-         
-         assertEquals(30, queue2.memoryRefCount());
-         assertEquals(0, queue2.memoryDeliveryCount());
-           
-         assertEquals(30, queue3.memoryRefCount());
-         assertEquals(0, queue3.memoryDeliveryCount());
-         
-         assertEquals(30, queue4.memoryRefCount());
-         assertEquals(0, queue4.memoryDeliveryCount());
-         
-         assertEquals(30, queue5.memoryRefCount());
-         assertEquals(0, queue5.memoryDeliveryCount());
-         
-         //Now we add the receivers
-         //Note that we did not do this before the send.
-         //If we had done so then it's likely that the automatic redistribution
-         //would have moved some around and there wouldn't be 30 in each queue
-         
-         PullingReceiver receiver1 = new PullingReceiver();
-         queue1.add(receiver1);
-         
-         PullingReceiver receiver2 = new PullingReceiver();
-         queue2.add(receiver2);
-         
-         PullingReceiver receiver3 = new PullingReceiver();
-         queue3.add(receiver3);
-         
-         PullingReceiver receiver4 = new PullingReceiver();
-         queue4.add(receiver4);
-         
-         PullingReceiver receiver5 = new PullingReceiver();
-         queue5.add(receiver5);
-                 
-         log.info("Added receivers");
-         
-         //Prompt delivery so a message pops into each receiver
-         queue1.deliver(true);
-         queue2.deliver(true);
-         queue3.deliver(true);
-         queue4.deliver(true);
-         queue5.deliver(true);
-         
-         Thread.sleep(1000);
-         
-         //Now we check the sizes again in case automatic balancing has erroneously
-         //kicked in                  
-         
-         assertEquals(29, queue1.memoryRefCount());
-         assertEquals(1, queue1.memoryDeliveryCount());
-         
-         assertEquals(29, queue2.memoryRefCount());
-         assertEquals(1, queue2.memoryDeliveryCount());
-           
-         assertEquals(29, queue3.memoryRefCount());
-         assertEquals(1, queue3.memoryDeliveryCount());
-         
-         assertEquals(29, queue4.memoryRefCount());
-         assertEquals(1, queue4.memoryDeliveryCount());
-         
-         assertEquals(29, queue5.memoryRefCount());
-         assertEquals(1, queue5.memoryDeliveryCount());
-         
-         Thread.sleep(5000);
-         
-         //And again - should still be no redistribution
-         
-         assertEquals(29, queue1.memoryRefCount());
-         assertEquals(1, queue1.memoryDeliveryCount());
-         
-         assertEquals(29, queue2.memoryRefCount());
-         assertEquals(1, queue2.memoryDeliveryCount());
-           
-         assertEquals(29, queue3.memoryRefCount());
-         assertEquals(1, queue3.memoryDeliveryCount());
-         
-         assertEquals(29, queue4.memoryRefCount());
-         assertEquals(1, queue4.memoryDeliveryCount());
-         
-         assertEquals(29, queue5.memoryRefCount());
-         assertEquals(1, queue5.memoryDeliveryCount());
-         
-         Thread.sleep(2000);
-         
-         log.info("Here are the sizes:");         
-         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
-         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
-         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
-         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
-         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-                           
-         log.info("trying to consume");
-         
-         //So we have 150 messages in total - 30 on each node.
-         
-         //If redistribution works ok, we should be able to do something like the following:
-         
-         //Consume 10 on node 1
-         
-         //Consume 50 on node 2
-         
-         //Consume 75 on node 3
-         
-         //Consume 10 on node 4
-         
-         //We leave the last 5 since they will be as deliveries in the receivers probably
-         
-         Delivery del;
-                  
-         log.info("consuming queue1");
-         for (int i = 0; i < 10; i++)
-         {       
-            queue1.deliver(true);
-            del = receiver1.getDelivery();
-            log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);  
-         }
-         log.info("consumed queue1");
-         
-         log.info("Here are the sizes:");  
-         
-         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
-         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
-         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
-         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
-         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-                  
-         log.info("consuming queue2");
-         for (int i = 0; i < 50; i++)
-         {       
-            queue2.deliver(true);
-            del = receiver2.getDelivery();
-            log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);  
-         }
-         log.info("consumed queue2");
-         
-         log.info("Here are the sizes:");         
-         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
-         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
-         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
-         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
-         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-         
-         log.info("consuming queue3");
-         for (int i = 0; i < 75; i++)
-         {       
-            queue3.deliver(true);
-            del = receiver3.getDelivery();
-            log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);  
-         }
-         log.info("consumed queue3");
-         
-         log.info("Here are the sizes:");         
-         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
-         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
-         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
-         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
-         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-         
-         log.info("consuming queue4");
-         for (int i = 0; i < 10; i++)
-         {       
-            queue4.deliver(true);
-            del = receiver4.getDelivery();
-            log.info("Got delivery: " + del.getReference().getMessageID());
-            del.acknowledge(null);  
-         }
-         log.info("consumed queue4");
-         
-         log.info("Here are the sizes:");         
-         log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
-         log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());         
-         log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
-         log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
-         log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-         
-      }
-      finally
-      { 
-         if (office1 != null)
-         {
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {            
-            office2.stop();
-         }
-         
-         if (office3 != null)
-         {
-            office3.stop();
-         }
-         
-         if (office4 != null)
-         {            
-            office4.stop();
-         }
-         
-         if (office5 != null)
-         {
-            office5.stop();
-         }
-      }
-   }
-   
-   class PullingReceiver implements Receiver
-   {
-      private Delivery del;
-
-      public synchronized Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
-      {
-         if (del != null)
-         {
-            return null;
-         }
-         
-         del = new SimpleDelivery(observer, reference, false);
-         
-         this.notify();
-         
-         return del;
-      }
-      
-      public synchronized Delivery getDelivery()
-      {
-         while (del == null)
-         {
-            try
-            {
-               this.wait();
-            }
-            catch (InterruptedException e)
-            {               
-            }
-         }
-         Delivery ret = del;
-         del = null;
-         return ret;
-      }
-      
-   }
-   
-   protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
-   {
-      MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
-      
-      FilterFactory ff = new SimpleFilterFactory();
-      
-      ClusterRouterFactory rf = new DefaultRouterFactory();
-      
-      DefaultClusteredPostOffice postOffice = 
-         new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, pullPolicy, rf, 1, 1000);
-      
-      postOffice.start();      
-      
-      return postOffice;
-   }
-   
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-   
-}
-
-
-

Modified: trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java	2006-10-12 15:47:04 UTC (rev 1472)
+++ trunk/tests/src/org/jboss/test/messaging/jms/BrowserTest.java	2006-10-13 12:09:35 UTC (rev 1473)
@@ -100,8 +100,7 @@
 	{
       ServerManagement.undeployQueue("Queue");
       
-      connection.stop();
-      connection = null;
+      connection.close();
       	
 		super.tearDown();     
 	}




More information about the jboss-cvs-commits mailing list