[jboss-cvs] JBoss Messaging SVN: r1667 - branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 30 19:09:22 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-30 19:09:21 -0500 (Thu, 30 Nov 2006)
New Revision: 1667

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-660 - Fix provided by Tim Fox

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-11-30 18:47:04 UTC (rev 1666)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-01 00:09:21 UTC (rev 1667)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.core;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,7 +32,6 @@
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
-
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.memory.MemoryManager;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
@@ -42,9 +43,6 @@
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
-
 /**
  * Channel implementation. It supports atomicity, isolation and recoverability of reliable messages.
  * The channel implementation here uses a "SEDA-type" approach, where requests to handle messages,
@@ -52,7 +50,7 @@
  * queue and executed serially by a single thread. This prevents lock contention since requests are
  * executed serially, resulting in better scalability and higher throughput at the expense of some
  * latency.
- * 
+ *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt> $Id: ChannelSupport.java,v 1.65
@@ -203,11 +201,11 @@
    public Delivery handle(DeliveryObserver sender, Routable r, Transaction tx)
    {
       checkClosed();
-      
+
       Future result = new Future();
 
       if (tx == null)
-      {         
+      {
          try
          {
             // Instead of executing directly, we add the handle request to the event queue.
@@ -220,7 +218,7 @@
          {
             log.warn("Thread interrupted", e);
          }
-   
+
          return (Delivery)result.getResult();
       }
       else
@@ -228,7 +226,7 @@
          return this.handleInternal(sender, r, tx);
       }
    }
-      
+
    // DeliveryObserver implementation --------------------------
 
    public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -252,7 +250,7 @@
          // Future result = new Future();
          //
          // this.executor.execute(new AcknowledgeRunnable(d, result));
-         //               
+         //
          // //For now we wait for result, but this may not be necessary
          // result.getResult();
          // }
@@ -284,9 +282,9 @@
       // try
       // {
       // Future result = new Future();
-      //         
+      //
       // this.executor.execute(new CancelRunnable(d, result));
-      //         
+      //
       // //For now we wait for result, but this may not be necessary
       // result.getResult();
       // }
@@ -296,13 +294,13 @@
       // }
 
 //      Exception e = new Exception();
-//      
+//
 //      log.error("cancelling delivery: " + d, e);
-//      
-      
-      
+//
+
+
       // TODO We should also consider executing cancels on the event queue
-      cancelInternal(d);   
+      cancelInternal(d);
    }
 
    // Distributor implementation ------------------------------------
@@ -314,7 +312,7 @@
       boolean added = router.add(r);
 
       if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
-      
+
       receiversReady = true;
       return added;
    }
@@ -372,7 +370,7 @@
          log.trace(this + " browse"
                   + (filter == null ? "" : ", filter = " + filter));
       }
-      
+
       synchronized (deliveryLock)
       {
          synchronized (refLock)
@@ -380,14 +378,14 @@
             //FIXME - This is currently broken since it doesn't take into account
             // refs paged into persistent storage
             // Also is very inefficient since it makes a copy
-            
+
             //TODO use the ref queue iterator
             List references = delivering(filter);
-                        
-            List undel = undelivered(filter);            
 
+            List undel = undelivered(filter);
+
             references.addAll(undel);
-            
+
             // dereference pass
             ArrayList messages = new ArrayList(references.size());
             for (Iterator i = references.iterator(); i.hasNext();)
@@ -397,25 +395,25 @@
             }
             return messages;
          }
-      }   
+      }
    }
 
    public void deliver(boolean synchronous)
    {
-      checkClosed();           
-     
+      checkClosed();
+
       // We put a delivery request on the event queue.
       try
       {
          Future future = null;
-         
+
          if (synchronous)
          {
             future = new Future();
          }
-                  
+
          this.executor.execute(new DeliveryRunnable(future));
-         
+
          if (synchronous)
          {
             // Wait to complete
@@ -435,25 +433,25 @@
          router.clear();
          router = null;
       }
-      
-   }  
-   
+
+   }
+
    /*
     * This method clears the channel.
     * Basically it acknowledges any outstanding deliveries and consumes the rest of the messages in the channel.
     * We can't just delete the corresponding references directly from the database since
     * a) We might be paging
     * b) The message might remain in the message store causing a leak
-    * 
+    *
     */
    public void removeAllReferences() throws Throwable
-   {        
+   {
       synchronized (refLock)
       {
          synchronized (deliveryLock)
          {
             //Ack the deliveries
-            
+
             //Clone to avoid ConcurrentModificationException
             Set dels = new HashSet(deliveries);
 
@@ -461,26 +459,26 @@
             while (iter.hasNext())
             {
                SimpleDelivery d = (SimpleDelivery) iter.next();
-               
+
                d.acknowledge(null);
             }
-            
+
             //Now we consume the rest of the messages
             //This may take a while if we have a lot of messages including perhaps millions
             //paged in the database - but there's no obvious other way to do it.
             //We cannot just delete them directly from the database - because we may end up with messages leaking
             //in the message store,
             //also we might get race conditions when other channels are updating the same message in the db
-            
+
             //Note - we don't do this in a tx - because the tx could be too big if we have millions of refs
             //paged in storage
-            
+
             MessageReference ref;
             while ((ref = removeFirstInMemory()) != null)
             {
                SimpleDelivery del = new SimpleDelivery(this, ref, false);
-               
-               del.acknowledge(null);           
+
+               del.acknowledge(null);
             }
          }
       }
@@ -580,7 +578,7 @@
     * Returns the count of messages stored AND being delivered.
     */
    public int messageCount()
-   {   
+   {
       synchronized (refLock)
       {
          synchronized (deliveryLock)
@@ -636,8 +634,8 @@
    /*
     * This methods delivers as many messages as possible to the router until no
     * more deliveries are returned. This method should never be called at the
-    * same time as handle. 
-    * 
+    * same time as handle.
+    *
     * @see org.jboss.messaging.core.Channel#deliver()
     */
    protected void deliverInternal()
@@ -647,16 +645,17 @@
          // The iterator is used to iterate through the refs in the channel in the case that they
          // don't match the selectors of any receivers.
          ListIterator iter = null;
-         
+
          MessageReference ref = null;
 
-         synchronized (refLock)
+         while (true)
          {
-            while (true)
+            synchronized (refLock)
             {
                if (iter == null)
                {
-                  ref = (MessageReference)messageRefs.peekFirst();
+                  //ref = (MessageReference) messageRefs.peekFirst();
+                  ref = removeFirstInMemory();
                }
                else
                {
@@ -669,127 +668,150 @@
                      ref = null;
                   }
                }
+            }
 
-               if (ref != null)
+            if (ref != null)
+            {
+               // Check if message is expired (we also do this on the client side)
+               // If so ack it from the channel
+               if (ref.isExpired())
                {
-                  if (trace) { log.trace(this + " pushing " + ref); }
+                  if (trace) { log.trace("Message reference: " + ref + " has expired"); }
 
-                  // Check if message is expired (we also do this on the client side). If so ack it
-                  // from the channel.
+                  // remove and acknowledge it
+                  if (iter == null)
+                  {
+                     //already removed
+                     //removeFirstInMemory();
+                  }
+                  else
+                  {
+                     iter.remove();
+                  }
 
-                  if (ref.isExpired())
+                  Delivery delivery = new SimpleDelivery(this, ref, true);
+
+                  acknowledgeInternal(delivery);
+               }
+               else
+               {
+                  // Reference is not expired
+
+                  // Attempt to push the ref to a receiver
+                  Delivery del = push(ref);
+
+                  if (del == null)
                   {
-                     if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+                     // No receiver, broken receiver or full receiver so we stop delivering; also
+                     // we need to decrement the delivery count, as no real delivery has been
+                     // actually performed
 
-                     // remove and acknowledge it
+                     if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+
+                     receiversReady = false;
+
                      if (iter == null)
                      {
-                        removeFirstInMemory();
+                        // add the message back
+                        synchronized (refLock)
+                        {
+                           messageRefs.addFirst(ref, ref.getPriority());
+                        }
                      }
                      else
                      {
-                        iter.remove();
+                        //we didn't remove it in the first place
                      }
 
-                     Delivery delivery = new SimpleDelivery(this, ref, true);
-
-                     acknowledgeInternal(delivery);
+                     return;
                   }
-                  else
+                  else if (!del.isSelectorAccepted())
                   {
-                     // Reference is not expired
+                     // No receiver accepted the message because no selectors matched, so we create
+                     // an iterator (if we haven't already created it) to iterate through the refs
+                     // in the channel.
 
-                     // Attempt to push the ref to a receiver
-                     Delivery del = push(ref);
+                     // TODO Note that this is only a partial solution since if there are messages
+                     // paged to storage it won't try those - i.e. it will only iterate through
+                     // those refs in memory. Dealing with refs in storage is somewhat tricky since
+                     // we can't just load them and iterate through them since we might run out of
+                     // memory, so we will need to load individual refs from storage given the
+                     // selector expressions. Secondly we should also introduce some in memory
+                     // indexes here to prevent having to iterate through all the refs every time.
+                     // Having said all that, having consumers on a queue that don't match many
+                     // messages is an antipattern and should be avoided by the user.
 
-                     if (del == null)
+                     if (iter == null)
                      {
-                        // No receiver, broken receiver or full receiver so we stop delivering; also
-                        // we need to decrement the delivery count, as no real delivery has been
-                        // actually performed
+                        //Add the message back
 
-                        if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+                        synchronized (refLock)
+                        {
+                           messageRefs.addFirst(ref, ref.getPriority());
 
-                        receiversReady = false;
+                           iter = messageRefs.iterator();
 
-                        return;
-                     }
-                     else if (!del.isSelectorAccepted())
-                     {
-                        // No receiver accepted the message because no selectors matched, so we create
-                        // an iterator (if we haven't already created it) to iterate through the refs
-                        // in the channel.
+                           //And skip the next one (that's the one we just added back)
 
-                        // TODO Note that this is only a partial solution since if there are messages
-                        // paged to storage it won't try those - i.e. it will only iterate through
-                        // those refs in memory. Dealing with refs in storage is somewhat tricky since
-                        // we can't just load them and iterate through them since we might run out of
-                        // memory, so we will need to load individual refs from storage given the
-                        // selector expressions. Secondly we should also introduce some in memory
-                        // indexes here to prevent having to iterate through all the refs every time.
-                        // Having said all that, having consumers on a queue that don't match many
-                        // messages is an antipattern and should be avoided by the user.
-                        if (iter == null)
-                        {
-                           iter = messageRefs.iterator();
+                           iter.next();
                         }
                      }
-                     else
+                  }
+                  else
+                  {
+                     if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+
+                     // Receiver accepted the reference
+
+                     // We must synchronize here to cope with another race condition where message
+                     // is cancelled/acked in flight while the following few actions are being
+                     // performed. e.g. delivery could be cancelled acked after being removed from
+                     // state but before delivery being added (observed).
+                     synchronized (del)
                      {
-                        if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+                        if (trace) { log.trace(this + " incrementing delivery count for " + del); }
 
-                        // Receiver accepted the reference
+                        // FIXME - It's actually possible the delivery could be
+                        // cancelled before it reaches
+                        // here, in which case we wouldn't get a delivery but we
+                        // still need to increment the delivery count
+                        // All the problems related to these race conditions and
+                        // fiddly edge cases will disappear once we do
+                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+                        // This will make life a lot easier
 
-                        // We must synchronize here to cope with another race condition where message
-                        // is cancelled/acked in flight while the following few actions are being
-                        // performed. e.g. delivery could be cancelled acked after being removed from
-                        // state but before delivery being added (observed).
-                        synchronized (del)
+                        if (!del.isCancelled())
                         {
-                           // FIXME - It's actually possible the delivery could be
-                           // cancelled before it reaches
-                           // here, in which case we wouldn't get a delivery but we
-                           // still need to increment the
-                           // delivery count
-                           // All the problems related to these race conditions and
-                           // fiddly edge cases will disappear
-                           // once we do
-                           // http://jira.jboss.com/jira/browse/JBMESSAGING-355
-                           // This will make life a lot easier
+                           if (iter == null)
+                           {
+                              //do nothing - already removed
+                              //removeFirstInMemory();
+                           }
+                           else
+                           {
+                              iter.remove();
+                           }
 
-                           if (!del.isCancelled())
+                           // delivered
+                           if (!del.isDone())
                            {
-                              if (iter == null)
+                              // Add the delivery to state
+                              synchronized (deliveryLock)
                               {
-                                 removeFirstInMemory();
+                                 deliveries.add(del);
                               }
-                              else
-                              {
-                                 iter.remove();
-                                 if (trace) { log.trace(this + " removed current message from iterator"); }
-                              }
-
-                              // delivered
-                              if (!del.isDone())
-                              {
-                                 synchronized (deliveryLock)
-                                 {
-                                    deliveries.add(del);
-                                    if (trace) { log.trace(this + " starting to track  " + del); }
-                                 }
-                              }
                            }
                         }
                      }
                   }
                }
-               else
-               {
-                  // No more refs in channel
-                  if (trace) { log.trace(this + " no more refs to deliver "); }
-                  break;
-               }
             }
+            else
+            {
+               // No more refs in channel
+               if (trace) { log.trace(this + " no more refs to deliver "); }
+               break;
+            }
          }
       }
       catch (Throwable t)
@@ -819,9 +841,9 @@
             //pages and is non recoverable a reliable ref will be paged in the database as reliable
             //which makes them hard to remove on server restart.
             //If we always page them as unreliable then it is easy to remove them.
-            ref.setReliable(false);               
+            ref.setReliable(false);
          }
-         
+
          if (tx == null)
          {
             // Don't even attempt synchronous delivery for a reliable message
@@ -839,18 +861,18 @@
                return null;
             }
 
-            checkMemory();                        
+            checkMemory();
 
             ref.setOrdering(messageOrdering.increment());
-            
+
             if (ref.isReliable() && recoverable)
             {
                // Reliable message in a recoverable state - also add to db
-               if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+               if (trace) { log.trace(this + "adding " + ref + " to database non-transactionally"); }
 
-               pm.addReference(channelID, ref, null);               
+               pm.addReference(channelID, ref, null);
             }
-            
+
             addReferenceInMemory(ref);
 
             // We only do delivery if there are receivers that haven't said they don't want
@@ -863,26 +885,39 @@
          }
          else
          {
+            if (trace) { log.trace(this + "adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
+
             checkMemory();
 
             if (ref.isReliable() && !acceptReliableMessages)
             {
-               // This transaction has no chance to succeed, since a reliable message cannot be
-               // safely stored by a non-recoverable state, so doom the transaction.
-               if (trace) { log.trace(this + " cannot handle reliable messages, dooming the transaction"); }
+               // this transaction has no chance to succeed, since a reliable
+               // message cannot be
+               // safely stored by a non-recoverable state, so doom the
+               // transaction
+               if (trace)
+               {
+                  log.trace(this + " cannot handle reliable messages, dooming the transaction");
+               }
                tx.setRollbackOnly();
-            } 
+            }
             else
             {
                // add to post commit callback
                ref.setOrdering(messageOrdering.increment());
                this.getCallback(tx).addRef(ref);
-               if (trace) { log.trace(this + " added " + ref + " to memory transactional callback, in transaction: " + tx); }
+               if (trace)
+               {
+                  log.trace(this + " added transactionally " + ref
+                           + " in memory");
+               }
             }
 
             if (ref.isReliable() && recoverable)
             {
                // Reliable message in a recoverable state - also add to db
+               if (trace) { log.trace(this + "adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
+
                pm.addReference(channelID, ref, tx);
             }
          }
@@ -901,23 +936,26 @@
    }
 
    protected void acknowledgeInternal(Delivery d) throws Exception
-   {      
+   {
       synchronized (deliveryLock)
       {
          acknowledgeInMemory(d);
       }
-         
+
       if (recoverable && d.getReference().isReliable())
       {
          pm.removeReference(channelID, d.getReference(), null);
       }
-           
-      d.getReference().releaseMemoryReference();        
+
+      d.getReference().releaseMemoryReference();
    }
 
    protected void cancelInternal(Delivery del) throws Exception
    {
-      if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
+      if (trace)
+      {
+         log.trace(this + " cancelling " + del + " in memory");
+      }
 
       boolean removed;
 
@@ -953,14 +991,14 @@
                refsInStorage++;
             }
          }
-         
-         // We may need to update the delivery count in the database
+
+         //We may need to update the delivery count in the database
          if (ref.isReliable())
          {
             pm.updateDeliveryCount(this.channelID, ref);
          }
 
-         if (trace) { log.trace(this + " added " + ref + " back into memory, ready for redelivery"); }
+         if (trace) { log.trace(this + " added " + ref + " back into state"); }
       }
    }
 
@@ -968,7 +1006,7 @@
    {
       synchronized (refLock)
       {
-         MessageReference result = (MessageReference)messageRefs.removeFirst();
+         MessageReference result = (MessageReference) messageRefs.removeFirst();
 
          if (refsInStorage > 0)
          {
@@ -984,8 +1022,7 @@
             paging = false;
          }
 
-         if (trace) { log.trace(this + " removing first message in memory, which is " + result); }
-         return (MessageReference)result;
+         return (MessageReference) result;
       }
    }
 
@@ -1029,21 +1066,21 @@
       // if (mm != null)
       // {
       // boolean isLow = mm.isMemoryLow();
-      //         
+      //
       // if (isLow)
       // {
-      //            
+      //
       // synchronized (refLock)
       // {
       // if (!paging)
       // {
       // log.info("Memory is low:" + this);
-      //                  
+      //
       // fullSize = messageRefs.size() + 1;
-      //                  
+      //
       // //TODO Make this configurable
       // pageSize = downCacheSize = Math.max(1, fullSize / 50);
-      //                  
+      //
       // log.info("Turned paging on, fullSize=" + fullSize + " dc:" +
       // downCacheSize + " ps: " + pageSize);
       // }
@@ -1051,7 +1088,7 @@
       // {
       // //log.info("already paging");
       // }
-      //               
+      //
       // }
       // }
       // }
@@ -1077,7 +1114,11 @@
          {
             messageRefs.addLast(ref, ref.getPriority());
 
-            if (trace) { log.trace(this + " added " + ref + " in memory"); }
+            if (trace)
+            {
+               log.trace(this + " added " + ref
+                        + " non-transactionally in memory");
+            }
 
             if (messageRefs.size() == fullSize)
             {
@@ -1232,7 +1273,7 @@
       {
          log.trace(this + " removed " + d + " from memory:" + removed);
       }
-      
+
       return removed;
    }
 
@@ -1250,7 +1291,7 @@
 
       // We may load less than desired due to "holes" - this is ok
       int numberLoaded = refInfos.size();
-      
+
       if (numberLoaded == 0)
       {
          throw new IllegalStateException(
@@ -1313,7 +1354,7 @@
             // return a reference
             // to the pre-existing message
             MessageReference ref = ms.reference(m);
-            
+
             refMap.put(new Long(m.getMessageID()), ref);
          }
       }
@@ -1348,15 +1389,15 @@
          ref.setDeliveryCount(info.getDeliveryCount());
 
          ref.setOrdering(info.getOrdering());
-         
+
          //We ignore the reliable field from the message - this is because reliable might be true on the message
          //but this is a non recoverable state
-         
+
          //FIXME - Really the message shouldn't have a reliable field at all,
          //Reliability is an attribute of the message reference, not the message
-         
+
          ref.setReliable(info.isReliable());
-         
+
          messageRefs.addLast(ref, ref.getPriority());
 
          if (recoverable && ref.isReliable())
@@ -1418,8 +1459,8 @@
 
       return callback;
    }
-   
 
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
@@ -1573,7 +1614,7 @@
          {
             MessageReference ref = (MessageReference) iter.next();
 
-            if (trace) { log.trace(this + " adding " + ref + " to memory"); }
+            if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
 
             try
             {
@@ -1669,7 +1710,7 @@
 
       return d;
    }
-   
+
    private void checkClosed()
    {
       if (router == null)
@@ -1677,18 +1718,18 @@
          throw new IllegalStateException(this + " closed");
       }
    }
-  
+
    // Inner classes -------------------------------------------------
 
    private class DeliveryRunnable implements Runnable
    {
       Future result;
-      
+
       DeliveryRunnable(Future result)
       {
          this.result = result;
       }
-      
+
       public void run()
       {
          receiversReady = true;
@@ -1720,5 +1761,5 @@
          Delivery d = handleInternal(sender, routable, null);
          result.setResult(d);
       }
-   }   
+   }
 }




More information about the jboss-cvs-commits mailing list