[hornetq-commits] JBoss hornetq SVN: r11727 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 21 09:56:37 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-11-21 09:56:37 -0500 (Mon, 21 Nov 2011)
New Revision: 11727

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-798 - avoiding synchronization on Queue while doing IO on paging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-11-21 14:54:27 UTC (rev 11726)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-11-21 14:56:37 UTC (rev 11727)
@@ -1612,166 +1612,168 @@
 
    // This method will deliver as many messages as possible until all consumers are busy or there are no more matching
    // or available messages
-   private synchronized void deliver()
+   private void deliver()
    {
-      if (paused || consumerList.isEmpty())
+      synchronized (this)
       {
-         return;
-      }
-      
-      if (log.isDebugEnabled())
-      {
-         log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
-      }
+         if (paused || consumerList.isEmpty())
+         {
+            return;
+         }
 
-      int busyCount = 0;
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+         }
 
-      int nullRefCount = 0;
+         int busyCount = 0;
 
-      int size = consumerList.size();
+         int nullRefCount = 0;
 
-      int endPos = pos == size - 1 ? 0 : size - 1;
+         int size = consumerList.size();
 
-      int numRefs = messageReferences.size();
+         int endPos = pos == size - 1 ? 0 : size - 1;
 
-      int handled = 0;
-      
-      long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+         int numRefs = messageReferences.size();
 
-      while (handled < numRefs)
-      {
-         if (handled == MAX_DELIVERIES_IN_LOOP)
-         {
-            // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+         int handled = 0;
 
-            deliverAsync();
+         long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
 
-            return;
-         }
-         
-         if (System.currentTimeMillis() > timeout)
+         while (handled < numRefs)
          {
-            if (isTrace)
+            if (handled == MAX_DELIVERIES_IN_LOOP)
             {
-               log.trace("delivery has been running for too long. Scheduling another delivery task now");
-            }
-            
-            deliverAsync();
-            
-            return;
-         }
-         
+               // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
+               // long
 
-         ConsumerHolder holder = consumerList.get(pos);
+               deliverAsync();
 
-         Consumer consumer = holder.consumer;
+               return;
+            }
 
-         if (holder.iter == null)
-         {
-            holder.iter = messageReferences.iterator();
-         }
-
-         MessageReference ref;
-
-         if (holder.iter.hasNext())
-         {
-            ref = holder.iter.next();
-         }
-         else
-         {
-            ref = null;
-         }
-         
-
-         if (ref == null)
-         {
-            nullRefCount++;
-         }
-         else
-         {
-            if (checkExpired(ref))
+            if (System.currentTimeMillis() > timeout)
             {
                if (isTrace)
                {
-                  log.trace("Reference " + ref + " being expired");
+                  log.trace("delivery has been running for too long. Scheduling another delivery task now");
                }
-               holder.iter.remove();
 
-               refRemoved(ref);
-               
-               handled++;
+               deliverAsync();
 
-               continue;
+               return;
             }
 
-            Consumer groupConsumer = null;
-            
-            if (isTrace)
+            ConsumerHolder holder = consumerList.get(pos);
+
+            Consumer consumer = holder.consumer;
+
+            if (holder.iter == null)
             {
-               log.trace("Queue " + this.getName() + " is delivering reference " + ref);
+               holder.iter = messageReferences.iterator();
             }
 
-            // If a group id is set, then this overrides the consumer chosen round-robin
+            MessageReference ref;
 
-            SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+            if (holder.iter.hasNext())
+            {
+               ref = holder.iter.next();
+            }
+            else
+            {
+               ref = null;
+            }
 
-            if (groupID != null)
+            if (ref == null)
             {
-               groupConsumer = groups.get(groupID);
+               nullRefCount++;
+            }
+            else
+            {
+               if (checkExpired(ref))
+               {
+                  if (isTrace)
+                  {
+                     log.trace("Reference " + ref + " being expired");
+                  }
+                  holder.iter.remove();
 
-               if (groupConsumer != null)
+                  refRemoved(ref);
+
+                  handled++;
+
+                  continue;
+               }
+
+               Consumer groupConsumer = null;
+
+               if (isTrace)
                {
-                  consumer = groupConsumer;
+                  log.trace("Queue " + this.getName() + " is delivering reference " + ref);
                }
-            }
 
-            HandleStatus status = handle(ref, consumer);
+               // If a group id is set, then this overrides the consumer chosen round-robin
 
-            if (status == HandleStatus.HANDLED)
-            {
-               holder.iter.remove();
+               SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
 
-               refRemoved(ref);
+               if (groupID != null)
+               {
+                  groupConsumer = groups.get(groupID);
 
-               if (groupID != null && groupConsumer == null)
+                  if (groupConsumer != null)
+                  {
+                     consumer = groupConsumer;
+                  }
+               }
+
+               HandleStatus status = handle(ref, consumer);
+
+               if (status == HandleStatus.HANDLED)
                {
-                  groups.put(groupID, consumer);
+                  holder.iter.remove();
+
+                  refRemoved(ref);
+
+                  if (groupID != null && groupConsumer == null)
+                  {
+                     groups.put(groupID, consumer);
+                  }
+
+                  handled++;
                }
+               else if (status == HandleStatus.BUSY)
+               {
+                  holder.iter.repeat();
 
-               handled++;
+                  busyCount++;
+               }
+               else if (status == HandleStatus.NO_MATCH)
+               {
+               }
             }
-            else if (status == HandleStatus.BUSY)
-            {
-               holder.iter.repeat();
 
-               busyCount++;
-            }
-            else if (status == HandleStatus.NO_MATCH)
+            if (pos == endPos)
             {
-            }
-         }
+               // Round robin'd all
 
-         if (pos == endPos)
-         {
-            // Round robin'd all
-
-            if (nullRefCount + busyCount == size)
-            {
-               if (log.isDebugEnabled())
+               if (nullRefCount + busyCount == size)
                {
-                   log.debug(this + "::All the consumers were busy, giving up now");
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug(this + "::All the consumers were busy, giving up now");
+                  }
+                  break;
                }
-               break;
+
+               nullRefCount = busyCount = 0;
             }
 
-            nullRefCount = busyCount = 0;
-         }
+            pos++;
 
-         pos++;
-
-         if (pos == size)
-         {
-            pos = 0;
+            if (pos == size)
+            {
+               pos = 0;
+            }
          }
       }
 
@@ -1817,7 +1819,7 @@
       }
    }
 
-   private synchronized void depage()
+   private void depage()
    {
       depagePending = false;
 



More information about the hornetq-commits mailing list