[jboss-cvs] JBoss Messaging SVN: r7545 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 8 09:48:29 EDT 2009


Author: jmesnil
Date: 2009-07-08 09:48:29 -0400 (Wed, 08 Jul 2009)
New Revision: 7545

Modified:
   trunk/src/main/org/jboss/messaging/core/server/Distributor.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
JBMESSAGING-1505: Optimise multiple message selectors on a queue

* replaced Distributor.peekConsumer() & incrementPosition() by getNextConsumer() method
* refactored QueueImpl deliver() and directDeliver() to use getNextConsumer()

Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-07-08 13:48:29 UTC (rev 7545)
@@ -38,7 +38,5 @@
 
    int getConsumerCount();
    
-   Consumer peekConsumer();
-
-   void incrementPosition();
+   Consumer getNextConsumer();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-08 13:48:29 UTC (rev 7545)
@@ -1288,9 +1288,7 @@
       {
          return;
       }
-      
-      Consumer firstConsumer = distributionPolicy.peekConsumer();
-      
+
       Consumer consumer;
       
       MessageReference reference;
@@ -1299,12 +1297,13 @@
 
       int totalConsumers = distributionPolicy.getConsumerCount();
       Set<Consumer> busyConsumers = new HashSet<Consumer>();
- 
+      Set<Consumer> nullReferences = new HashSet<Consumer>();
+
       while (true)
       {
-        consumer = distributionPolicy.peekConsumer();
+        consumer = distributionPolicy.getNextConsumer();
          
-         iterator = iterators.get(consumer);
+        iterator = iterators.get(consumer);
          
          if (iterator == null)
          {
@@ -1331,15 +1330,21 @@
          
          if (reference == null)
          {
-            if (consumer == firstConsumer)
+            nullReferences.add(consumer);
+            if (nullReferences.size() + busyConsumers.size() == totalConsumers)
             {
                startDepaging();
                // We delivered all the messages - go into direct delivery
                direct = true;
                promptDelivery = false;
+               return;
             }
-            return;
+            continue;
          }
+         else
+         {
+            nullReferences.remove(consumer);
+         }
 
          initPagingStore(reference.getMessage().getDestination());
 
@@ -1350,8 +1355,6 @@
             Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
             if (groupConsumer != null && groupConsumer != consumer)
             {
-               // this consumer is not in charge of the message group
-               distributionPolicy.incrementPosition();
                continue;
             }
          }
@@ -1469,16 +1472,17 @@
          return HandleStatus.BUSY;
       }
       
-      Consumer firstConsumer = distributionPolicy.peekConsumer();
-      
       HandleStatus status;
 
       boolean filterRejected = false;
 
+      int consumerCount = 0;
+
       while (true)
       {
-         Consumer consumer = distributionPolicy.peekConsumer();
-
+         Consumer consumer = distributionPolicy.getNextConsumer();
+         consumerCount++;
+         
          final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
 
          if (groupId != null)
@@ -1486,8 +1490,6 @@
             Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
             if (groupConsumer != null && groupConsumer != consumer)
             {
-               // this consumer is not in charge in the message group
-               distributionPolicy.incrementPosition();
                continue;
             }
          }
@@ -1514,7 +1516,7 @@
             }
          }
          // if we've tried all of them
-         if (distributionPolicy.peekConsumer() == firstConsumer)
+         if (consumerCount == distributionPolicy.getConsumerCount())
          {
             if (filterRejected)
             {
@@ -1564,10 +1566,6 @@
          }
          return HandleStatus.BUSY;
       }
-      finally
-      {
-         distributionPolicy.incrementPosition();         
-      }
 
       if (status == null)
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java	2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java	2009-07-08 13:48:29 UTC (rev 7545)
@@ -57,12 +57,14 @@
       return super.getConsumerCount();
    }
 
-   public Consumer peekConsumer()
+   public synchronized Consumer getNextConsumer()
    {
-      return consumers.get(pos);
+      Consumer consumer = consumers.get(pos);
+      incrementPosition();
+      return consumer;
    }
    
-   public synchronized void incrementPosition()
+   private synchronized void incrementPosition()
    {
       pos++;
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-07-08 13:41:50 UTC (rev 7544)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-07-08 13:48:29 UTC (rev 7545)
@@ -1249,16 +1249,12 @@
          return false;
       }
 
-      public void incrementPosition()
-      {
-      }
-      
       public int getConsumerCount()
       {
          return 0;
       }
 
-      public Consumer peekConsumer()
+      public Consumer getNextConsumer()
       {
          return consumer;
       }




More information about the jboss-cvs-commits mailing list