[Jboss-cvs] JBoss Messaging SVN: r1215 - trunk/src/main/org/jboss/messaging/core

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 8 09:14:03 EDT 2006


Author: timfox
Date: 2006-08-08 09:14:02 -0400 (Tue, 08 Aug 2006)
New Revision: 1215

Modified:
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
Interim race fix



Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-08 13:10:56 UTC (rev 1214)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-08 13:14:02 UTC (rev 1215)
@@ -228,7 +228,7 @@
          return this.handleInternal(sender, r, tx);
       }
    }
-
+      
    // DeliveryObserver implementation --------------------------
 
    public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -441,7 +441,7 @@
     * 
     */
    public void removeAllReferences() throws Throwable
-   {  
+   {        
       synchronized (refLock)
       {
          synchronized (deliveryLock)
@@ -571,7 +571,7 @@
    }
 
    public int messageCount()
-   {
+   {   
       synchronized (refLock)
       {
          synchronized (deliveryLock)
@@ -643,149 +643,148 @@
          
          while (true)
          {           
-            //TODO simplify locking - do we really need two locks??
             synchronized (refLock)
             {              
-               synchronized (deliveryLock)
+               if (iter == null)
                {
+                  ref = (MessageReference) messageRefs.peekFirst();
+               }
+               else
+               {
+                  if (iter.hasNext())
+                  {                        
+                     ref = (MessageReference)iter.next();
+                  } 
+                  else
+                  {
+                     ref = null;
+                  }
+               }
+            }
+
+            if (ref != null)
+            {
+               // Check if message is expired (we also do this on the client
+               // side)
+               // If so ack it from the channel
+               if (ref.isExpired())
+               {
+                  if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+
+                  // remove and acknowledge it
                   if (iter == null)
                   {
-                     ref = (MessageReference) messageRefs.peekFirst();
+                     removeFirstInMemory();
                   }
                   else
                   {
-                     if (iter.hasNext())
-                     {                        
-                        ref = (MessageReference)iter.next();
-                     } 
-                     else
+                     iter.remove();
+                  }
+
+                  Delivery delivery = new SimpleDelivery(this, ref, true);
+
+                  acknowledgeInternal(delivery);
+               }
+               else
+               {
+                  // Reference is not expired
+
+                  // Attempt to push the ref to a receiver
+                  Delivery del = push(ref);
+
+                  if (del == null)
+                  {
+                     // no receiver, broken receiver
+                     // or full receiver    
+                     // so we stop delivering
+                     if (trace) { log.trace(this + ": no delivery returned for message" 
+                                  + ref + " so no receiver got the message");
+                                  log.trace("Delivery is now complete"); }
+
+                     receiversReady = false;
+
+                     return;
+                  }
+                  else if (!del.isSelectorAccepted())
+                  {
+                     // No receiver accepted the message because no selectors matched
+                     // So we create an iterator (if we haven't already created it) to
+                     // iterate through the refs in the channel
+                     // TODO Note that this is only a partial solution since if there are messages paged to storage
+                     // it won't try those - i.e. it will only iterate through those refs in memory.
+                     // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
+                     // through them since we might run out of memory
+                     // So we will need to load individual refs from storage given the selector expressions
+                     // Secondly we should also introduce some in memory indexes here to prevent having to
+                     // iterate through all the refs every time
+                     // Having said all that, having consumers on a queue that don't match many messages
+                     // is an antipattern and should be avoided by the user
+                     if (iter == null)
                      {
-                        ref = null;
-                     }
+                        iter = messageRefs.iterator();
+                     }                     
                   }
-               
-                  if (ref != null)
+                  else
                   {
-                     // Check if message is expired (we also do this on the client
-                     // side)
-                     // If so ack it from the channel
-                     if (ref.isExpired())
+                     if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+                     
+                     //Receiver accepted the reference
+
+                     // We must synchronize here to cope with another race
+                     // condition where message is
+                     // cancelled/acked in flight while the following few
+                     // actions are being performed.
+                     // e.g. delivery could be cancelled acked after being
+                     // removed from state but before
+                     // delivery being added (observed).
+                     synchronized (del)
                      {
-                        if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-      
-                        // remove and acknowledge it
-                        if (iter == null)
+                        if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+
+                        // FIXME - It's actually possible the delivery could be
+                        // cancelled before it reaches
+                        // here, in which case we wouldn't get a delivery but we
+                        // still need to increment the
+                        // delivery count
+                        // All the problems related to these race conditions and
+                        // fiddly edge cases will disappear
+                        // once we do
+                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+                        // This will make life a lot easier
+
+                        del.getReference().incrementDeliveryCount();                    
+
+                        if (!del.isCancelled())
                         {
-                           removeFirstInMemory();
-                        }
-                        else
-                        {
-                           iter.remove();
-                        }
-      
-                        Delivery delivery = new SimpleDelivery(this, ref, true);
-      
-                        acknowledgeInternal(delivery);
-                     }
-                     else
-                     {
-                        // Reference is not expired
-      
-                        // Attempt to push the ref to a receiver
-                        Delivery del = push(ref);
-      
-                        if (del == null)
-                        {
-                           // no receiver, broken receiver
-                           // or full receiver    
-                           // so we stop delivering
-                           if (trace) { log.trace(this + ": no delivery returned for message" 
-                                        + ref + " so no receiver got the message");
-                                        log.trace("Delivery is now complete"); }
-      
-                           receiversReady = false;
-      
-                           return;
-                        }
-                        else if (!del.isSelectorAccepted())
-                        {
-                           // No receiver accepted the message because no selectors matched
-                           // So we create an iterator (if we haven't already created it) to
-                           // iterate through the refs in the channel
-                           // TODO Note that this is only a partial solution since if there are messages paged to storage
-                           // it won't try those - i.e. it will only iterate through those refs in memory.
-                           // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
-                           // through them since we might run out of memory
-                           // So we will need to load individual refs from storage given the selector expressions
-                           // Secondly we should also introduce some in memory indexes here to prevent having to
-                           // iterate through all the refs every time
-                           // Having said all that, having consumers on a queue that don't match many messages
-                           // is an antipattern and should be avoided by the user
                            if (iter == null)
                            {
-                              iter = messageRefs.iterator();
-                           }                     
-                        }
-                        else
-                        {
-                           if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-                           
-                           //Receiver accepted the reference
-      
-                           // We must synchronize here to cope with another race
-                           // condition where message is
-                           // cancelled/acked in flight while the following few
-                           // actions are being performed.
-                           // e.g. delivery could be cancelled acked after being
-                           // removed from state but before
-                           // delivery being added (observed).
-                           synchronized (del)
+                              removeFirstInMemory();
+                           }
+                           else
                            {
-                              if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-      
-                              // FIXME - It's actually possible the delivery could be
-                              // cancelled before it reaches
-                              // here, in which case we wouldn't get a delivery but we
-                              // still need to increment the
-                              // delivery count
-                              // All the problems related to these race conditions and
-                              // fiddly edge cases will disappear
-                              // once we do
-                              // http://jira.jboss.com/jira/browse/JBMESSAGING-355
-                              // This will make life a lot easier
-      
-                              del.getReference().incrementDeliveryCount();                    
-      
-                              if (!del.isCancelled())
+                              iter.remove();                                
+                           }
+
+                           // delivered
+                           if (!del.isDone())
+                           {
+                              // Add the delivery to state
+                              synchronized (deliveryLock)
                               {
-                                 if (iter == null)
-                                 {
-                                    removeFirstInMemory();
-                                 }
-                                 else
-                                 {
-                                    iter.remove();                                
-                                 }
-      
-                                 // delivered
-                                 if (!del.isDone())
-                                 {
-                                    // Add the delivery to state
-                                    deliveries.add(del);                                 
-                                 }
+                                 deliveries.add(del);
                               }
                            }
                         }
                      }
                   }
-                  else
-                  {
-                     // No more refs in channel
-                     if (trace) { log.trace(this + " no more refs to deliver "); }
-                     break;
-                  }
                }
             }
+            else
+            {
+               // No more refs in channel
+               if (trace) { log.trace(this + " no more refs to deliver "); }
+               break;
+            }
          }
       }
       catch (Throwable t)
@@ -1705,7 +1704,7 @@
          throw new IllegalStateException(this + " closed");
       }
    }
-
+  
    // Inner classes -------------------------------------------------
 
    private class DeliveryRunnable implements Runnable
@@ -1748,6 +1747,5 @@
          Delivery d = handleInternal(sender, routable, null);
          result.setResult(d);
       }
-   }
-
+   }   
 }




More information about the jboss-cvs-commits mailing list