[jboss-cvs] JBoss Messaging SVN: r7531 - in trunk: examples/jms/message-group and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 7 06:42:54 EDT 2009


Author: jmesnil
Date: 2009-07-07 06:42:54 -0400 (Tue, 07 Jul 2009)
New Revision: 7531

Removed:
   trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java
Modified:
   trunk/docs/user-manual/en/message-grouping.xml
   trunk/docs/user-manual/en/queue-attributes.xml
   trunk/examples/jms/message-group/readme.html
   trunk/examples/jms/message-group/server0/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java
   trunk/src/main/org/jboss/messaging/core/server/Distributor.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
JBMESSAGING-1505: Optimise multiple message selectors on a queue
JBMESSAGING-375: Grouping of messages

* in QueueImpl, consumers with filters have their own iterators to iterate on the
  references and "skip" messages that do not match their filters. Consumers without filters
  now peeks on the references directly
* removed the grouping round robin distributor, choosing the appropriate consumer
  is now done while delivering the messages. The QueueImpl keeps a map of the group ID <-> consumer
* updated examples + doc (there is no need to set the distribution policy class
  to use message groups now)

Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/docs/user-manual/en/message-grouping.xml	2009-07-07 10:42:54 UTC (rev 7531)
@@ -15,22 +15,6 @@
             messages with the same group id.</para>
       </listitem>
    </itemizedlist>
-   <section id="message-grouping.configuring">
-      <title>Configuring Message Grouping</title>
-      <para>Message grouping must be enabled in the address-setting configuration by using a
-         specific <literal>distribution-policy-class</literal>:</para>
-      <programlisting>
- &lt;address-setting match="jms.queue.exampleQueue"&gt;
-    &lt;distribution-policy-class&gt;
-       org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor
-    &lt;/distribution-policy-class&gt;
- &lt;/address-setting&gt;
-       </programlisting>
-      <para>By default, <literal>distribution-policy-class</literal> is set to <literal
-            >org.jboss.messaging.core.server.impl.RoundRobinDistributor</literal> and message groups
-         will not be handled by the queue. Address wildcards can be used to configure the
-         distribution policy for a set of addresses (see <xref linkend="wildcard-syntax"/>).</para>
-   </section>
    <section>
       <title>Using Core API</title>
       <para>The property name used to identify the message group is <literal

Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/docs/user-manual/en/queue-attributes.xml	2009-07-07 10:42:54 UTC (rev 7531)
@@ -79,7 +79,7 @@
         &lt;redelivery-delay>5000&lt;/redelivery-delay>
         &lt;expiry-address>jms.queue.expiryQueue&lt;/expiry-address>
         &lt;last-value-queue>true&lt;/last-value-queue>
-        &lt;distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor&lt;/distribution-policy-class>
+        &lt;distribution-policy-class>org.jboss.messaging.core.server.impl.RoundRobinDistributor&lt;/distribution-policy-class>
         &lt;max-size-bytes>100000&lt;/max-size-bytes>
         &lt;page-size-bytes>20000&lt;/page-size-bytes>
         &lt;redistribution-delay>0&lt;/redistribution-delay>

Modified: trunk/examples/jms/message-group/readme.html
===================================================================
--- trunk/examples/jms/message-group/readme.html	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/examples/jms/message-group/readme.html	2009-07-07 10:42:54 UTC (rev 7531)
@@ -13,19 +13,7 @@
      <li>Messages in a message group will be all delivered to no more than one of the queue's consumers. The consumer that receives the
      first message of a group will receive all the messages that belongs to the group.</li>
      
-     <p>To enable message group, you need to configure the 'distribution-policy-class' for the queue 
-     to be "org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor" in jbm-queues.xml. This configure parameter
-     is part of the 'address-settings' configuration element, as shown in the following </p>
-     
-     <pre>
-     <code>
-      &lt;address-setting match=&quot;jms.queue.exampleQueue&quot;&gt;
-         &lt;distribution-policy-class&gt;org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor&lt;/distribution-policy-class&gt;
-      &lt;/address-setting&gt;
-     </code>
-     </pre>
-     
-     <p>After the configuration, you can make any message belong to a message group by setting its 'JMXGroupID' string property to the group id.
+     <p>You can make any message belong to a message group by setting its 'JMXGroupID' string property to the group id.
      In this example we create a message group 'Group-0'. And make such a message group of 10 messages. It also create two consumers on the queue
      where the 10 'Group-0' group messages are to be sent. You can see that with message grouping enabled, all the 10 messages will be received by
      the first consumer. The second consumer will receive none. </p>

Modified: trunk/examples/jms/message-group/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/jms/message-group/server0/jbm-configuration.xml	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/examples/jms/message-group/server0/jbm-configuration.xml	2009-07-07 10:42:54 UTC (rev 7531)
@@ -30,10 +30,4 @@
       </security-setting>
    </security-settings>
 
-   <address-settings>
-      <address-setting match="jms.queue.exampleQueue">
-         <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
-      </address-setting>
-   </address-settings>
-
 </configuration>

Modified: trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -53,4 +53,5 @@
    Iterator<T> iterator();
    
    boolean isEmpty();
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -22,7 +22,6 @@
 
 package org.jboss.messaging.core.server;
 
-import java.util.List;
 
 /**
  * 
@@ -33,15 +32,13 @@
  */
 public interface Distributor
 {
-   HandleStatus distribute(final MessageReference reference);
-
    void addConsumer(Consumer consumer);
 
    boolean removeConsumer(Consumer consumer);
 
    int getConsumerCount();
-
-   boolean hasConsumers();
    
-   List<Consumer> getConsumers();
+   Consumer peekConsumer();
+
+   void incrementPosition();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -49,11 +49,6 @@
       return consumers.size();
    }
 
-   public boolean hasConsumers()
-   {
-      return !consumers.isEmpty();
-   }
-   
    public List<Consumer> getConsumers()
    {
       return consumers;

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -1,139 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * Distributes message based on the message property 'JBM_GROUP_ID'. Once a message has been successfully delivered to a
- * consumer that consumer is then bound to that group. Any message that has the same group id set will always be
- * delivered to the same consumer.
- * The Initial consumer is the first consumer found, using the round robin policy, that hasn't been bound to a group, If
- * there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
- * so on.
- *
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- */
-public class GroupingRoundRobinDistributor extends RoundRobinDistributor
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private ConcurrentMap<SimpleString, Consumer> cons = new ConcurrentHashMap<SimpleString, Consumer>();
-
-   // Distributor implementation ------------------------------------
-
-   public HandleStatus distribute(final MessageReference reference)
-   {
-      if (consumers.isEmpty())
-      {
-         return HandleStatus.BUSY;
-      }
-      
-      final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
-      
-      if (groupId != null)
-      {
-         int startPos = pos;
-         
-         boolean filterRejected = false;
-
-         while (true)
-         {
-            Consumer consumer = consumers.get(pos);
-            
-            Consumer oldConsumer = cons.putIfAbsent(groupId, consumer);
-
-            if (oldConsumer == null)
-            {
-               incrementPosition();
-            }
-            else
-            {
-               consumer = oldConsumer;
-            }
-
-            HandleStatus status = handle(reference, consumer);
-            
-            if (status == HandleStatus.HANDLED)
-            {
-               return HandleStatus.HANDLED;
-            }
-            else if (status == HandleStatus.NO_MATCH)
-            {
-               filterRejected = true;
-            }
-            else if (status == HandleStatus.BUSY)
-            {
-               // if we were previously bound, we can remove and try the next consumer
-               return HandleStatus.BUSY;
-            }
-            // if we've tried all of them
-            if (startPos == pos)
-            {
-               // Tried all of them
-               if (filterRejected)
-               {
-                  return HandleStatus.NO_MATCH;
-               }
-               else
-               {
-                  // Give up - all consumers busy
-                  return HandleStatus.BUSY;
-               }
-            }
-         }
-      }
-      else
-      {
-         return super.distribute(reference);
-      }
-   }
-
-   public synchronized boolean removeConsumer(Consumer consumer)
-   {
-      boolean removed = super.removeConsumer(consumer);
-      
-      if (removed)
-      {
-         for (SimpleString group : cons.keySet())
-         {
-            if (consumer == cons.get(group))
-            {
-               cons.remove(group);
-            }
-         }
-      }
-      return removed;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -25,6 +25,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -63,7 +65,7 @@
 import org.jboss.messaging.utils.SimpleString;
 
 /**
- * Implementation of a Queue TODO use Java 5 concurrent queue
+ * Implementation of a Queue
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -139,6 +141,8 @@
    // We cache the consumers here since we don't want to include the redistributor
 
    private final Set<Consumer> consumers = new HashSet<Consumer>();
+   private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+   private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
 
    public QueueImpl(final long persistenceID,
                     final SimpleString address,
@@ -420,21 +424,36 @@
       cancelRedistributor();
 
       distributionPolicy.addConsumer(consumer);
-
       consumers.add(consumer);
+      if (consumer.getFilter() != null)
+      {
+         iterators.put(consumer, messageReferences.iterator());
+      }
    }
 
    public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
    {
       boolean removed = distributionPolicy.removeConsumer(consumer);
 
-      if (!distributionPolicy.hasConsumers())
+      if (distributionPolicy.getConsumerCount() == 0)
       {
          promptDelivery = false;
       }
 
       consumers.remove(consumer);
+      iterators.remove(consumer);
 
+      if (removed)
+      {
+         for (SimpleString groupID : groups.keySet())
+         {
+            if (consumer == groups.get(groupID))
+            {
+               groups.remove(groupID);
+            }
+         }
+      }
+      
       return removed;
    }
 
@@ -1265,12 +1284,28 @@
 
       direct = false;
 
+      if (distributionPolicy.getConsumerCount() == 0)
+      {
+         return;
+      }
+      
+      Consumer firstConsumer = distributionPolicy.peekConsumer();
+      
+      Consumer consumer;
+      
       MessageReference reference;
 
       Iterator<MessageReference> iterator = null;
 
+      int totalConsumers = distributionPolicy.getConsumerCount();
+      Set<Consumer> busyConsumers = new HashSet<Consumer>();
+ 
       while (true)
       {
+        consumer = distributionPolicy.peekConsumer();
+         
+         iterator = iterators.get(consumer);
+         
          if (iterator == null)
          {
             reference = messageReferences.peekFirst();
@@ -1284,52 +1319,45 @@
             else
             {
                reference = null;
+               if (consumer.getFilter() != null)
+               {
+                  // we have iterated on the whole queue for
+                  // messages which matches the consumer filter.
+                  // we reset its iterator in case new messages are added to the queue
+                  iterators.put(consumer, messageReferences.iterator());
+               }
             }
          }
-
+         
          if (reference == null)
          {
-            if (iterator == null)
+            if (consumer == firstConsumer)
             {
-               if (pagingStore != null)
-               {
-                  // If the queue is empty, we need to check if there are pending messages, and throw a warning
-                  if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
-                  {
-                     // This is just a *request* to depage. Depage will only happens if there is space on the Address
-                     // and GlobalSize
-                     pagingStore.startDepaging();
-
-                     log.warn("The Queue " + name +
-                              " is empty, however there are pending messages on Paging for the address " +
-                              pagingStore.getStoreName() +
-                              " waiting message ACK before they could be routed");
-                  }
-               }
+               startDepaging();
                // We delivered all the messages - go into direct delivery
                direct = true;
-
                promptDelivery = false;
             }
             return;
          }
 
-         // PagingManager would be null only on testcases
-         if (pagingStore == null && pagingManager != null)
+         initPagingStore(reference.getMessage().getDestination());
+
+         final SimpleString groupID = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
+
+         if (groupID != null)
          {
-            // TODO: It would be better if we could initialize the pagingStore during the construction
-            try
+            Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+            if (groupConsumer != null && groupConsumer != consumer)
             {
-               pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+               // this consumer is not in charge of the message group
+               distributionPolicy.incrementPosition();
+               continue;
             }
-            catch (Exception e)
-            {
-               // This shouldn't happen, and if it happens, this shouldn't abort the route
-            }
          }
+         
+         HandleStatus status = handle(reference, consumer);
 
-         HandleStatus status = deliver(reference);
-
          if (status == HandleStatus.HANDLED)
          {
             if (iterator == null)
@@ -1343,15 +1371,21 @@
          }
          else if (status == HandleStatus.BUSY)
          {
-            // All consumers busy - give up
-            break;
+            busyConsumers.add(consumer);
+            if (groupID != null || busyConsumers.size() == totalConsumers)
+            {
+               // when all consumers are busy, we stop
+               break;
+            }
          }
-         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         else if (status == HandleStatus.NO_MATCH)
          {
-            // Consumers not all busy - but filter not accepting - iterate
-            // back
-            // through the queue
-            iterator = messageReferences.iterator();
+            // if consumer filter reject the message make sure it won't be assigned the message group
+            if (groupID != null)
+            {
+               groups.remove(consumer);
+            }
+            continue;
          }
       }
    }
@@ -1374,7 +1408,7 @@
       {
          // Deliver directly
 
-         HandleStatus status = deliver(ref);
+         HandleStatus status = directDeliver(ref);
 
          if (status == HandleStatus.HANDLED)
          {
@@ -1428,18 +1462,121 @@
       }
    }
 
-   private HandleStatus deliver(final MessageReference reference)
+   private synchronized HandleStatus directDeliver(final MessageReference reference)
    {
-      HandleStatus status = distributionPolicy.distribute(reference);
+      if (distributionPolicy.getConsumerCount() == 0)
+      {
+         return HandleStatus.BUSY;
+      }
+      
+      Consumer firstConsumer = distributionPolicy.peekConsumer();
+      
+      HandleStatus status;
 
+      boolean filterRejected = false;
+
+      while (true)
+      {
+         Consumer consumer = distributionPolicy.peekConsumer();
+
+         final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
+
+         if (groupId != null)
+         {
+            Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
+            if (groupConsumer != null && groupConsumer != consumer)
+            {
+               // this consumer is not in charge in the message group
+               distributionPolicy.incrementPosition();
+               continue;
+            }
+         }
+
+         status = handle(reference, consumer);
+
+         if (status == HandleStatus.HANDLED)
+         {
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            filterRejected = true;
+            if (groupId != null)
+            {
+               groups.remove(consumer);
+            }
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            if (groupId != null)
+            {
+               break;
+            }
+         }
+         // if we've tried all of them
+         if (distributionPolicy.peekConsumer() == firstConsumer)
+         {
+            if (filterRejected)
+            {
+               status = HandleStatus.NO_MATCH;
+               break;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               status = HandleStatus.BUSY;
+               break;
+            }
+         }
+      }
+      
       if (status == HandleStatus.NO_MATCH)
       {
          promptDelivery = true;
       }
-
+      
       return status;
    }
+   
+   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
+   {
 
+      HandleStatus status;
+      try
+      {
+         status = consumer.handle(reference);
+      }
+      catch (Throwable t)
+      {
+         log.warn("removing consumer which did not handle a message, consumer=" +
+                  consumer +
+                  ", message=" +
+                  reference, t);
+
+         // If the consumer throws an exception we remove the consumer
+         try
+         {
+            removeConsumer(consumer);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to remove consumer", e);
+         }
+         return HandleStatus.BUSY;
+      }
+      finally
+      {
+         distributionPolicy.incrementPosition();         
+      }
+
+      if (status == null)
+      {
+         throw new IllegalStateException("ClientConsumer.handle() should never return null");
+      }
+
+      return status; 
+   }
+   
    private void removeExpiringReference(final MessageReference ref) throws Exception
    {
       if (ref.getMessage().getExpiration() > 0)
@@ -1515,6 +1652,42 @@
       }
    }
 
+   private synchronized void initPagingStore(SimpleString destination)
+   {      
+      // PagingManager would be null only on testcases
+      if (pagingStore == null && pagingManager != null)
+      {
+         // TODO: It would be better if we could initialize the pagingStore during the construction
+         try
+         {
+            pagingStore = pagingManager.getPageStore(destination);
+         }
+         catch (Exception e)
+         {
+            // This shouldn't happen, and if it happens, this shouldn't abort the route
+         }
+      }
+   }
+
+  private synchronized void startDepaging()
+   {
+      if (pagingStore != null)
+      {
+         // If the queue is empty, we need to check if there are pending messages, and throw a warning
+         if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
+         {
+            // This is just a *request* to depage. Depage will only happens if there is space on the Address
+            // and GlobalSize
+            pagingStore.startDepaging();
+
+            log.warn("The Queue " + name +
+                     " is empty, however there are pending messages on Paging for the address " +
+                     pagingStore.getStoreName() +
+                     " waiting message ACK before they could be routed");
+         }
+      }
+   }
+  
    // Inner classes
    // --------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -24,14 +24,13 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
 
 /**
  * A RoundRobinDistributor
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
 public class RoundRobinDistributor extends DistributorImpl
 {
@@ -58,57 +57,13 @@
       return super.getConsumerCount();
    }
 
-   public HandleStatus distribute(final MessageReference reference)
+   public Consumer peekConsumer()
    {
-      if (getConsumerCount() == 0)
-      {
-         return HandleStatus.BUSY;
-      }
-      int startPos = pos;
-      
-      boolean filterRejected = false;
-      
-      HandleStatus status;
-      
-      while (true)
-      {
-         status = handle(reference, getNextConsumer());
-
-         if (status == HandleStatus.HANDLED)
-         {
-            return HandleStatus.HANDLED;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            filterRejected = true;
-         }
-         if (startPos == pos)
-         {
-            // Tried all of them
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
-      }
+      return consumers.get(pos);
    }
-
-   private final synchronized Consumer getNextConsumer()
+   
+   public synchronized void incrementPosition()
    {
-      Consumer consumer = consumers.get(pos);
-      
-      incrementPosition();
-      
-      return consumer;
-   }
-
-   protected void incrementPosition()
-   {
       pos++;
       
       if (pos == consumers.size())
@@ -116,38 +71,4 @@
          pos = 0;
       }
    }
-
-   protected HandleStatus handle(final MessageReference reference, final Consumer consumer)
-   {
-      HandleStatus status;
-      try
-      {
-         status = consumer.handle(reference);
-      }
-      catch (Throwable t)
-      {
-         log.warn("removing consumer which did not handle a message, " + "consumer=" +
-                  consumer +
-                  ", message=" +
-                  reference, t);
-
-         // If the consumer throws an exception we remove the consumer
-         try
-         {
-            removeConsumer(consumer);
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to remove consumer", e);
-         }
-
-         return HandleStatus.BUSY;
-      }
-
-      if (status == null)
-      {
-         throw new IllegalStateException("ClientConsumer.handle() should never return null");
-      }
-      return status;
-   }
 }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -331,6 +331,10 @@
             {
                try
                {
+                  // sleep a little bit to ensure that
+                  // prod.send will be called before cons.reveive
+                  Thread.sleep(500);
+
                   synchronized (session)
                   {
                      Message m = cons.receive(5000);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
+import java.util.concurrent.CountDownLatch;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -29,13 +31,9 @@
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
-import java.util.concurrent.CountDownLatch;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
@@ -62,9 +60,6 @@
       MessagingServer server = createServer(false);
       try
       {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-         server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();
@@ -118,9 +113,6 @@
       MessagingServer server = createServer(false);
       try
       {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-         server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();
@@ -183,10 +175,6 @@
       MessagingServer server = createServer(false);
       try
       {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-
-         server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -21,6 +21,13 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -34,18 +41,11 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.server.Messaging;
 import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.impl.XidImpl;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.utils.SimpleString;
 
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
@@ -77,8 +77,8 @@
       DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
       consumer2.setMessageHandler(dummyMessageHandler2);
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertTrue(dummyMessageHandler.list.size() == 100);
-      assertTrue(dummyMessageHandler2.list.size() == 0);
+      assertEquals(100, dummyMessageHandler.list.size());
+      assertEquals(0, dummyMessageHandler2.list.size());
       consumer.close();
       consumer2.close();
    }
@@ -111,14 +111,14 @@
       DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
       consumer2.setMessageHandler(dummyMessageHandler2);
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertEquals(dummyMessageHandler.list.size(), 50);
+      assertEquals(50, dummyMessageHandler.list.size());
       int i = 0;
       for (ClientMessage message : dummyMessageHandler.list)
       {
          assertEquals(message.getBody().readString(), "m" + i);
          i += 2;
       }
-      assertEquals(dummyMessageHandler2.list.size(), 50);
+      assertEquals(50, dummyMessageHandler2.list.size());
       i = 1;
       for (ClientMessage message : dummyMessageHandler2.list)
       {
@@ -158,14 +158,14 @@
       DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
       consumer2.setMessageHandler(dummyMessageHandler2);
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertEquals(dummyMessageHandler.list.size(), 50);
+      assertEquals(50, dummyMessageHandler.list.size());
       int i = 0;
       for (ClientMessage message : dummyMessageHandler.list)
       {
          assertEquals(message.getBody().readString(), "m" + i);
          i += 2;
       }
-      assertEquals(dummyMessageHandler2.list.size(), 50);
+      assertEquals(50, dummyMessageHandler2.list.size());
       i = 1;
       for (ClientMessage message : dummyMessageHandler2.list)
       {
@@ -558,9 +558,6 @@
       // start the server
       server.start();
 
-      AddressSettings qs = new AddressSettings();
-      qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-      server.getAddressSettingsRepository().addMatch(qName.toString(), qs);
       // then we create a client as normal
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       clientSession = sessionFactory.createSession(false, true, true);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -31,6 +31,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.Distributor;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -916,6 +917,137 @@
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
    }
 
+   public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
+   {
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+      FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
+
+      consumer.setStatusImmediate(HandleStatus.BUSY);
+
+      queue.addConsumer(consumer);
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+         ref.getMessage().putStringProperty("color", "red");
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      queue.deliverNow();
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      for (int i = numMessages; i < numMessages * 2; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+         ref.getMessage().putStringProperty("color", "green");
+         queue.addLast(ref);
+      }
+
+      assertEquals(20, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      consumer.setStatusImmediate(null);
+
+      for (int i = numMessages * 2; i < numMessages * 3; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      queue.deliverNow();
+
+      assertEquals(numMessages, consumer.getReferences().size());
+      assertEquals(30, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(10, queue.getDeliveringCount());
+   }
+
+   public void testConsumerWithFilterThenAddMoreMessages() throws Exception
+   {
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+      final int numMessages = 10;
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+         ref.getMessage().putStringProperty("color", "red");
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      queue.deliverNow();
+
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      for (int i = numMessages; i < numMessages * 2; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+         ref.getMessage().putStringProperty("color", "green");
+         queue.addLast(ref);
+      }
+
+      FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
+
+      queue.addConsumer(consumer);
+
+      queue.deliverNow();
+
+      assertEquals(20, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(10, queue.getDeliveringCount());
+
+
+      for (int i = numMessages * 2; i < numMessages * 3; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+         ref.getMessage().putStringProperty("color", "green");
+         queue.addLast(ref);
+      }
+
+      queue.deliverNow();
+
+      assertEquals(20, consumer.getReferences().size());
+      assertEquals(30, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(20, queue.getDeliveringCount());
+   }
+
+
    // Private ------------------------------------------------------------------------------
 
    private void testConsumerWithFilters(boolean direct) throws Exception
@@ -1100,29 +1232,13 @@
 
    class DummyDistributionPolicy implements Distributor
    {
+      Consumer consumer;
+
       public List<Consumer> getConsumers()
       {         
          return null;
       }
 
-      Consumer consumer;
-      public Consumer select(ServerMessage message, boolean redeliver)
-      {
-         return null;
-      }
-
-      public HandleStatus distribute(MessageReference reference)
-      {
-         try
-         {
-            return consumer.handle(reference);
-         }
-         catch (Exception e)
-         {
-            return HandleStatus.BUSY;
-         }
-      }
-
       public void addConsumer(Consumer consumer)
       {
          this.consumer = consumer;
@@ -1133,20 +1249,19 @@
          return false;
       }
 
+      public void incrementPosition()
+      {
+      }
+      
       public int getConsumerCount()
       {
          return 0;
       }
 
-      public boolean hasConsumers()
+      public Consumer peekConsumer()
       {
-         return false;
+         return consumer;
       }
-
-      public int getCurrentPosition()
-      {
-         return 0;  
-      }
    }
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-07-07 10:42:54 UTC (rev 7531)
@@ -111,6 +111,11 @@
 
    public synchronized HandleStatus handle(MessageReference reference)
    {
+      if (statusToReturn == HandleStatus.BUSY)
+      {
+         return HandleStatus.BUSY;
+      }
+
       if (filter != null)
       {
          if (filter.match(reference.getMessage()))




More information about the jboss-cvs-commits mailing list