[Jboss-cvs] JBoss Messaging SVN: r1213 - in trunk: src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms/server/destination

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 8 07:47:53 EDT 2006


Author: timfox
Date: 2006-08-08 07:47:48 -0400 (Tue, 08 Aug 2006)
New Revision: 1213

Modified:
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-501 interim



Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-08 11:25:08 UTC (rev 1212)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-08 11:47:48 UTC (rev 1213)
@@ -643,148 +643,149 @@
          
          while (true)
          {           
+            //TODO simplify locking - do we really need two locks??
             synchronized (refLock)
             {              
-               if (iter == null)
+               synchronized (deliveryLock)
                {
-                  ref = (MessageReference) messageRefs.peekFirst();
-               }
-               else
-               {
-                  if (iter.hasNext())
-                  {                        
-                     ref = (MessageReference)iter.next();
-                  } 
-                  else
-                  {
-                     ref = null;
-                  }
-               }
-            }
-
-            if (ref != null)
-            {
-               // Check if message is expired (we also do this on the client
-               // side)
-               // If so ack it from the channel
-               if (ref.isExpired())
-               {
-                  if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
-                  // remove and acknowledge it
                   if (iter == null)
                   {
-                     removeFirstInMemory();
+                     ref = (MessageReference) messageRefs.peekFirst();
                   }
                   else
                   {
-                     iter.remove();
-                  }
-
-                  Delivery delivery = new SimpleDelivery(this, ref, true);
-
-                  acknowledgeInternal(delivery);
-               }
-               else
-               {
-                  // Reference is not expired
-
-                  // Attempt to push the ref to a receiver
-                  Delivery del = push(ref);
-
-                  if (del == null)
-                  {
-                     // no receiver, broken receiver
-                     // or full receiver    
-                     // so we stop delivering
-                     if (trace) { log.trace(this + ": no delivery returned for message" 
-                                  + ref + " so no receiver got the message");
-                                  log.trace("Delivery is now complete"); }
-
-                     receiversReady = false;
-
-                     return;
-                  }
-                  else if (!del.isSelectorAccepted())
-                  {
-                     // No receiver accepted the message because no selectors matched
-                     // So we create an iterator (if we haven't already created it) to
-                     // iterate through the refs in the channel
-                     // TODO Note that this is only a partial solution since if there are messages paged to storage
-                     // it won't try those - i.e. it will only iterate through those refs in memory.
-                     // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
-                     // through them since we might run out of memory
-                     // So we will need to load individual refs from storage given the selector expressions
-                     // Secondly we should also introduce some in memory indexes here to prevent having to
-                     // iterate through all the refs every time
-                     // Having said all that, having consumers on a queue that don't match many messages
-                     // is an antipattern and should be avoided by the user
-                     if (iter == null)
+                     if (iter.hasNext())
+                     {                        
+                        ref = (MessageReference)iter.next();
+                     } 
+                     else
                      {
-                        iter = messageRefs.iterator();
-                     }                     
+                        ref = null;
+                     }
                   }
-                  else
+               
+                  if (ref != null)
                   {
-                     if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-                     
-                     //Receiver accepted the reference
-
-                     // We must synchronize here to cope with another race
-                     // condition where message is
-                     // cancelled/acked in flight while the following few
-                     // actions are being performed.
-                     // e.g. delivery could be cancelled acked after being
-                     // removed from state but before
-                     // delivery being added (observed).
-                     synchronized (del)
+                     // Check if message is expired (we also do this on the client
+                     // side)
+                     // If so ack it from the channel
+                     if (ref.isExpired())
                      {
-                        if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-
-                        // FIXME - It's actually possible the delivery could be
-                        // cancelled before it reaches
-                        // here, in which case we wouldn't get a delivery but we
-                        // still need to increment the
-                        // delivery count
-                        // All the problems related to these race conditions and
-                        // fiddly edge cases will disappear
-                        // once we do
-                        // http://jira.jboss.com/jira/browse/JBMESSAGING-355
-                        // This will make life a lot easier
-
-                        del.getReference().incrementDeliveryCount();                    
-
-                        if (!del.isCancelled())
+                        if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+      
+                        // remove and acknowledge it
+                        if (iter == null)
                         {
+                           removeFirstInMemory();
+                        }
+                        else
+                        {
+                           iter.remove();
+                        }
+      
+                        Delivery delivery = new SimpleDelivery(this, ref, true);
+      
+                        acknowledgeInternal(delivery);
+                     }
+                     else
+                     {
+                        // Reference is not expired
+      
+                        // Attempt to push the ref to a receiver
+                        Delivery del = push(ref);
+      
+                        if (del == null)
+                        {
+                           // no receiver, broken receiver
+                           // or full receiver    
+                           // so we stop delivering
+                           if (trace) { log.trace(this + ": no delivery returned for message" 
+                                        + ref + " so no receiver got the message");
+                                        log.trace("Delivery is now complete"); }
+      
+                           receiversReady = false;
+      
+                           return;
+                        }
+                        else if (!del.isSelectorAccepted())
+                        {
+                           // No receiver accepted the message because no selectors matched
+                           // So we create an iterator (if we haven't already created it) to
+                           // iterate through the refs in the channel
+                           // TODO Note that this is only a partial solution since if there are messages paged to storage
+                           // it won't try those - i.e. it will only iterate through those refs in memory.
+                           // Dealing with refs in storage is somewhat tricky since we can't just load them and iterate
+                           // through them since we might run out of memory
+                           // So we will need to load individual refs from storage given the selector expressions
+                           // Secondly we should also introduce some in memory indexes here to prevent having to
+                           // iterate through all the refs every time
+                           // Having said all that, having consumers on a queue that don't match many messages
+                           // is an antipattern and should be avoided by the user
                            if (iter == null)
                            {
-                              removeFirstInMemory();
-                           }
-                           else
+                              iter = messageRefs.iterator();
+                           }                     
+                        }
+                        else
+                        {
+                           if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+                           
+                           //Receiver accepted the reference
+      
+                           // We must synchronize here to cope with another race
+                           // condition where message is
+                           // cancelled/acked in flight while the following few
+                           // actions are being performed.
+                           // e.g. delivery could be cancelled acked after being
+                           // removed from state but before
+                           // delivery being added (observed).
+                           synchronized (del)
                            {
-                              iter.remove();                                
-                           }
-
-                           // delivered
-                           if (!del.isDone())
-                           {
-                              // Add the delivery to state
-                              synchronized (deliveryLock)
+                              if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+      
+                              // FIXME - It's actually possible the delivery could be
+                              // cancelled before it reaches
+                              // here, in which case we wouldn't get a delivery but we
+                              // still need to increment the
+                              // delivery count
+                              // All the problems related to these race conditions and
+                              // fiddly edge cases will disappear
+                              // once we do
+                              // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+                              // This will make life a lot easier
+      
+                              del.getReference().incrementDeliveryCount();                    
+      
+                              if (!del.isCancelled())
                               {
-                                 deliveries.add(del);
+                                 if (iter == null)
+                                 {
+                                    removeFirstInMemory();
+                                 }
+                                 else
+                                 {
+                                    iter.remove();                                
+                                 }
+      
+                                 // delivered
+                                 if (!del.isDone())
+                                 {
+                                    // Add the delivery to state
+                                    deliveries.add(del);                                 
+                                 }
                               }
                            }
                         }
                      }
                   }
+                  else
+                  {
+                     // No more refs in channel
+                     if (trace) { log.trace(this + " no more refs to deliver "); }
+                     break;
+                  }
                }
             }
-            else
-            {
-               // No more refs in channel
-               if (trace) { log.trace(this + " no more refs to deliver "); }
-               break;
-            }
          }
       }
       catch (Throwable t)

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java	2006-08-08 11:25:08 UTC (rev 1212)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java	2006-08-08 11:47:48 UTC (rev 1213)
@@ -568,7 +568,7 @@
          
          // Start the connection for delivery
          conn.start();
-   
+         
          // Remove all messages from the topic
          
          ServerManagement.invoke(destObjectName, "removeAllMessages", null, null);




More information about the jboss-cvs-commits mailing list