[jboss-cvs] JBoss Messaging SVN: r5073 - in trunk: src/main/org/jboss/messaging/core/server and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 6 05:36:11 EDT 2008


Author: ataylor
Date: 2008-10-06 05:36:10 -0400 (Mon, 06 Oct 2008)
New Revision: 5073

Modified:
   trunk/messaging.ipr
   trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-375 - changed the distribution policy API to make it simpler

Modified: trunk/messaging.ipr
===================================================================
--- trunk/messaging.ipr	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/messaging.ipr	2008-10-06 09:36:10 UTC (rev 5073)
@@ -361,6 +361,9 @@
     <option name="GENERATE_IIOP_STUBS" value="false" />
     <option name="ADDITIONAL_OPTIONS_STRING" value="" />
   </component>
+  <component name="SvnBranchConfigurationManager">
+    <option name="myVersion" value="123" />
+  </component>
   <component name="VcsDirectoryMappings">
     <mapping directory="" vcs="" />
     <mapping directory="$PROJECT_DIR$" vcs="svn" />
@@ -453,13 +456,14 @@
         <root url="file://$PROJECT_DIR$/tests/config" />
         <root url="file://$PROJECT_DIR$/tests/jms-tests/config" />
         <root url="file://$PROJECT_DIR$/src/config" />
-        <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080520.004618-19.jar!/" />
         <root url="jar://$PROJECT_DIR$/thirdparty/easymock/lib/easymock.jar!/" />
         <root url="jar://$PROJECT_DIR$/thirdparty/easymock-classextension/lib/easymockclassextension.jar!/" />
         <root url="jar://$PROJECT_DIR$/thirdparty/cglib/lib/cglib.jar!/" />
         <root url="jar://$PROJECT_DIR$/tools/lib/jbossbuild.jar!/" />
-        <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
-        <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
+        <root url="jar://$ANT_HOME$/lib/ant-junit.jar!/" />
+        <root url="jar://$ANT_HOME$/lib/ant.jar!/" />
+        <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar!/" />
+        <root url="jar://$PROJECT_DIR$/thirdparty/netty/lib/netty-3.0.0.CR4.jar!/" />
       </CLASSES>
       <JAVADOC />
       <SOURCES>
@@ -471,5 +475,8 @@
       <jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />
     </library>
   </component>
+  <UsedPathMacros>
+    <macro name="ANT_HOME" />
+  </UsedPathMacros>
 </project>
 

Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -31,7 +31,7 @@
  */
 public interface DistributionPolicy
 {
-   Consumer select(ServerMessage message, boolean redeliver);
+   HandleStatus distribute(final MessageReference reference);
 
    void addConsumer(Consumer consumer);
 
@@ -40,6 +40,4 @@
    int getConsumerCount();
 
    boolean hasConsumers();
-
-   int getCurrentPosition();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -35,8 +35,6 @@
 {
    protected final List<Consumer> consumers = new ArrayList<Consumer>();
 
-   public abstract Consumer select(ServerMessage message, boolean redeliver);
-
    public void addConsumer(Consumer consumer)
    {
       consumers.add(consumer);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -24,6 +24,8 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.util.SimpleString;
 
 import java.util.ArrayList;
@@ -38,6 +40,7 @@
  * The Initial consumer is the first consumer found, using the round robin policy, that hasn't been bound to a group, If
  * there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
  * so on.
+ *
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
 public class GroupingRoundRobinDistributionPolicy extends RoundRobinDistributionPolicy
@@ -48,199 +51,90 @@
 
    // Attributes ----------------------------------------------------
 
-   // Map with GroupID as a key and a Consumer as value.
-   private final Map<SimpleString, ConsumerState> consumers = new ConcurrentHashMap<SimpleString, ConsumerState>();
+   private ConcurrentHashMap<SimpleString, Consumer> cons = new ConcurrentHashMap<SimpleString, Consumer>();
 
-   // we hold the state of each consumer, i.e., is it bound etc
-   private final Map<Consumer, ConsumerState> consumerStateMap = new ConcurrentHashMap<Consumer, ConsumerState>();
 
    // Distributor implementation ------------------------------------
 
-   @Override
-   public Consumer select(final ServerMessage message, final boolean redeliver)
+   public HandleStatus distribute(MessageReference reference)
    {
-      if (message.getProperty(MessageImpl.GROUP_ID) != null)
+      final SimpleString groupId = (SimpleString) reference.getMessage().getProperty(MessageImpl.GROUP_ID);
+      if (groupId != null)
       {
-         final SimpleString groupId = (SimpleString)message.getProperty(MessageImpl.GROUP_ID);
-         final ConsumerState consumerState = consumers.get(groupId);
-         if (consumerState != null)
+         boolean bound;
+         int startPos = pos;
+         boolean filterRejected = false;
+
+         while (true)
          {
-            // if this is a redelivery and the group is bound we wait.
-            if (redeliver && consumerState.isBound())
+            Consumer consumer = cons.putIfAbsent(groupId, consumers.get(pos));
+            if (consumer == null)
             {
-               return null;
+               incrementPosition();
+               consumer = cons.get(groupId);
+               bound = false;
             }
-            // if this is a redelivery and it was its first attempt we can look for another consumer and use that
-            else if (redeliver && !consumerState.isBound())
+            else
             {
-               removeBinding(groupId, consumerState);
-               return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+               bound = true;
             }
-            // we bind after we know that the first message has been successfully consumed
-            else if (!consumerState.isBound())
+            HandleStatus status = handle(reference, consumer);
+            if (status == HandleStatus.HANDLED)
             {
-               consumerState.setBound(true);
+               return HandleStatus.HANDLED;
             }
-            consumerState.setAvailable(false);
-
-            return consumerState.getConsumer();
+            else if (status == HandleStatus.NO_MATCH)
+            {
+               filterRejected = true;
+            }
+            else if (status == HandleStatus.BUSY)
+            {
+               //if we were previously bound, we can remove and try the next consumer
+               if (bound)
+               {
+                  return HandleStatus.BUSY;
+               }
+               else
+               {
+                  cons.remove(groupId);
+               }
+            }
+            //if we've tried all of them
+            if (startPos == pos)
+            {
+               // Tried all of them
+               if (filterRejected)
+               {
+                  return HandleStatus.NO_MATCH;
+               }
+               else
+               {
+                  // Give up - all consumers busy
+                  return HandleStatus.BUSY;
+               }
+            }
          }
-         else
-         {
-            return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
-         }
       }
       else
       {
-         return super.select(message, redeliver);
+         return super.distribute(reference);
       }
    }
 
-   @Override
-   public synchronized void addConsumer(final Consumer consumer)
+   public synchronized boolean removeConsumer(Consumer consumer)
    {
-      super.addConsumer(consumer);
-      consumerStateMap.put(consumer, new ConsumerState(consumer));
-   }
-
-   @Override
-   public synchronized boolean removeConsumer(final Consumer consumer)
-   {
-      final boolean removed = super.removeConsumer(consumer);
+      boolean removed = super.removeConsumer(consumer);
       if (removed)
       {
-         final ConsumerState cs = consumerStateMap.remove(consumer);
-         for (final SimpleString ss : cs.getGroupIds())
+         for (SimpleString group : cons.keySet())
          {
-            consumers.remove(ss);
-         }
-
-      }
-      return removed;
-   }
-
-   /**
-    * we need to find the next available consumer that doesn't have a binding. If there are no free we use the next
-    * available in the normal Round Robin fashion.
-    * @param message
-    * @param redeliver
-    * @param groupId
-    * @return
-    */
-   private ConsumerState getNextPositionAndBind(final ServerMessage message,
-                                                final boolean redeliver,
-                                                final SimpleString groupId)
-   {
-      Consumer consumer = super.select(message, redeliver);
-      final ConsumerState cs = consumerStateMap.get(consumer);
-      // if there is only one return it
-      if (getConsumerCount() == 1 || cs.isAvailable())
-      {
-         consumers.put(groupId, cs);
-         cs.getGroupIds().add(groupId);
-         return cs;
-      }
-      else
-      {
-         consumer = super.select(message, redeliver);
-         ConsumerState ncs = consumerStateMap.get(consumer);
-         while (!ncs.isAvailable())
-         {
-            consumer = super.select(message, redeliver);
-            ncs = consumerStateMap.get(consumer);
-            if (ncs == cs)
+            if (consumer == cons.get(group))
             {
-               cs.getGroupIds().add(groupId);
-               return cs;
+               cons.remove(group);
+               break;
             }
          }
-         ncs.getGroupIds().add(groupId);
-         return ncs;
       }
+      return removed;
    }
-
-   private void removeBinding(final SimpleString groupId, final ConsumerState consumerState)
-   {
-      consumerState.setAvailable(true);
-      consumerState.getGroupIds().remove(groupId);
-      consumers.remove(groupId);
-   }
-
-   /**
-    * holds the current state of a consumer, is it available, what groups it is bound to etc.
-    */
-   class ConsumerState
-   {
-      private final Consumer consumer;
-
-      private volatile boolean isBound = false;
-
-      private volatile boolean available = true;
-
-      private final List<SimpleString> groupIds = new ArrayList<SimpleString>();
-
-      public ConsumerState(final Consumer consumer)
-      {
-         this.consumer = consumer;
-      }
-
-      public boolean isBound()
-      {
-         return isBound;
-      }
-
-      public void setBound(final boolean bound)
-      {
-         isBound = bound;
-      }
-
-      public boolean isAvailable()
-      {
-         return available;
-      }
-
-      public void setAvailable(final boolean available)
-      {
-         this.available = available;
-      }
-
-      public Consumer getConsumer()
-      {
-         return consumer;
-      }
-
-      public List<SimpleString> getGroupIds()
-      {
-         return groupIds;
-      }
-
-      @Override
-      public boolean equals(final Object o)
-      {
-         if (this == o)
-         {
-            return true;
-         }
-         if (o == null || getClass() != o.getClass())
-         {
-            return false;
-         }
-
-         final ConsumerState that = (ConsumerState)o;
-
-         if (!consumer.equals(that.consumer))
-         {
-            return false;
-         }
-
-         return true;
-      }
-
-      @Override
-      public int hashCode()
-      {
-         return consumer.hashCode();
-      }
-
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -51,7 +51,7 @@
 
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
@@ -175,7 +175,7 @@
    {
       return add(ref, true);
    }
-   
+
    public synchronized void addListFirst(final LinkedList<MessageReference> list)
    {
       ListIterator<MessageReference> iter = list.listIterator(list.size());
@@ -183,11 +183,11 @@
       while (iter.hasPrevious())
       {
          MessageReference ref = iter.previous();
-         
+
          ServerMessage msg = ref.getMessage();
 
          messageReferences.addFirst(ref, msg.getPriority());
-         
+
          checkWaiting(msg.getMessageID());
       }
 
@@ -289,7 +289,7 @@
    public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
    {
       boolean removed = distributionPolicy.removeConsumer(consumer);
-      
+
       if (removed)
       {
          distributionPolicy.removeConsumer(consumer);
@@ -700,7 +700,7 @@
          return true;
       }
 
-      QueueImpl qother = (QueueImpl)other;
+      QueueImpl qother = (QueueImpl) other;
 
       return name.equals(qother.name);
    }
@@ -822,94 +822,20 @@
 
    private HandleStatus deliver(final MessageReference reference)
    {
-      if (distributionPolicy.getConsumerCount() == 0)
+      HandleStatus status = distributionPolicy.distribute(reference);
+      if (status == HandleStatus.HANDLED)
       {
-         return HandleStatus.BUSY;
-      }
+         deliveringCount.incrementAndGet();
 
-      int startPos = distributionPolicy.getCurrentPosition();
-
-      boolean filterRejected = false;
-
-      HandleStatus status = null;
-      int pos;
-      while (true)
+         return HandleStatus.HANDLED;
+      }
+      else if (status == HandleStatus.NO_MATCH)
       {
-         Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
-         pos = distributionPolicy.getCurrentPosition();
-         if (consumer == null)
-         {
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
-         try
-         {
-            status = consumer.handle(reference);
-         }
-         catch (Throwable t)
-         {
-            log.warn("removing consumer which did not handle a message, " + "consumer=" +
-                     consumer +
-                     ", message=" +
-                     reference, t);
-
-            // If the consumer throws an exception we remove the consumer
-            try
-            {
-               removeConsumer(consumer);
-            }
-            catch (Exception e)
-            {
-               log.error("Failed to remove consumer", e);
-            }
-
-            return HandleStatus.BUSY;
-         }
-
-         if (status == null)
-         {
-            throw new IllegalStateException("ClientConsumer.handle() should never return null");
-         }
-
-         if (status == HandleStatus.HANDLED)
-         {
-            deliveringCount.incrementAndGet();
-
-            return HandleStatus.HANDLED;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            promptDelivery = true;
-
-            filterRejected = true;
-         }
-         if (startPos > distributionPolicy.getConsumerCount() - 1)
-         {
-            startPos = distributionPolicy.getConsumerCount() - 1;
-         }
-         if (startPos == pos)
-         {
-            // Tried all of them
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
+         promptDelivery = true;
       }
+      return status;
    }
-   
+
    private void checkWaiting(final long messageID)
    {
       CountDownLatch latch = waitingIDMap.remove(messageID);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -18,38 +18,28 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.logging.Logger;
 
 /**
- * 
  * A RoundRobinDistributionPolicy
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
 public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
 {
-   int pos = 0;
+   Logger log = Logger.getLogger(RoundRobinDistributionPolicy.class);
 
-   public Consumer select(ServerMessage message, boolean redeliver)
-   {     
-      if(consumers.isEmpty())
-      {
-         return null;
-      }
-      int startPos = pos++;
+   protected int pos = 0;
 
-      if (pos == consumers.size())
-      {
-         pos = 0;
-      }
-      return consumers.get(startPos);
-   }
 
    public synchronized void addConsumer(Consumer consumer)
    {
@@ -64,8 +54,90 @@
       return super.removeConsumer(consumer);
    }
 
-   public int getCurrentPosition()
+   public HandleStatus distribute(final MessageReference reference)
    {
-      return pos;
+      if (getConsumerCount() == 0)
+      {
+         return HandleStatus.BUSY;
+      }
+      int startPos = pos;
+      boolean filterRejected = false;
+      HandleStatus status;
+      while (true)
+      {
+         status = handle(reference, getNextConsumer());
+
+         if (status == HandleStatus.HANDLED)
+         {
+            return HandleStatus.HANDLED;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            filterRejected = true;
+         }
+         if (startPos == pos)
+         {
+            // Tried all of them
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }
+         }
+      }
    }
+
+   protected Consumer getNextConsumer()
+   {
+      Consumer consumer = consumers.get(pos);
+      incrementPosition();
+      return consumer;
+   }
+
+   protected void incrementPosition()
+   {
+      pos++;
+      if (pos == consumers.size())
+      {
+         pos = 0;
+      }
+   }
+
+   protected HandleStatus handle(MessageReference reference, Consumer consumer)
+   {
+      HandleStatus status;
+      try
+      {
+         status = consumer.handle(reference);
+      }
+      catch (Throwable t)
+      {
+         log.warn("removing consumer which did not handle a message, " + "consumer=" +
+                  consumer +
+                  ", message=" +
+                  reference, t);
+
+         // If the consumer throws an exception we remove the consumer
+         try
+         {
+            removeConsumer(consumer);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to remove consumer", e);
+         }
+
+         return HandleStatus.BUSY;
+      }
+
+      if (status == null)
+      {
+         throw new IllegalStateException("ClientConsumer.handle() should never return null");
+      }
+      return status;
+   }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -25,7 +25,11 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.DistributionPolicy;
 import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
@@ -46,20 +50,71 @@
       policy = null;
    }
 
-   public void testSingleConsumerSingleGroup()
+   public void testSingleConsumerSingleGroup() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       policy.addConsumer(consumer);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
-      EasyMock.replay(consumer, serverMessage);
-      assertEquals(consumer, policy.select( serverMessage, false));
-      assertEquals(consumer, policy.select(serverMessage, false));
-      EasyMock.verify(consumer, serverMessage);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, serverMessage, reference);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      EasyMock.verify(consumer, serverMessage, reference);
    }
 
-   public void testMultipleConsumersSingleGroup()
+   public void testRunOutOfConsumers() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
+      EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      DistributionPolicy dp = new RoundRobinDistributionPolicy();
+      dp.addConsumer(c1);
+      dp.addConsumer(c2);
+      dp.addConsumer(c3);
+      EasyMock.expect(c1.handle(reference)).andReturn(HandleStatus.BUSY);
+      EasyMock.expect(c2.handle(reference)).andReturn(HandleStatus.BUSY);
+      EasyMock.expect(c3.handle(reference)).andReturn(HandleStatus.BUSY);
+      EasyMock.replay(reference, c1, c2, c3, serverMessage);
+
+      HandleStatus status = dp.distribute(reference);
+      assertEquals(status, HandleStatus.BUSY);
+      EasyMock.verify(reference, c1, c2, c3, serverMessage);
+   }
+
+   public void testRunOutOfConsumersNoMatch() throws Exception
+   {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
+      EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      DistributionPolicy dp = new RoundRobinDistributionPolicy();
+      dp.addConsumer(c1);
+      dp.addConsumer(c2);
+      dp.addConsumer(c3);
+      EasyMock.expect(c1.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.expect(c2.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.expect(c3.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.replay(reference, c1, c2, c3, serverMessage);
+
+      HandleStatus status = dp.distribute(reference);
+      assertEquals(status, HandleStatus.NO_MATCH);
+      EasyMock.verify(reference, c1, c2, c3, serverMessage);
+   }
+
+   public void testMultipleConsumersSingleGroup() throws Exception
+   {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -67,29 +122,40 @@
       policy.addConsumer(consumer2);
       policy.addConsumer(consumer3);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
-      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer, policy.select(serverMessage, false));
-      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
    }
 
-   public void testSingleConsumerTwoGroups()
+   public void testSingleConsumerTwoGroups() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       policy.addConsumer(consumer);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
       ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
       EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
-      EasyMock.replay(consumer, serverMessage, serverMessage2);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer, policy.select(serverMessage2, false));
-      EasyMock.verify(consumer, serverMessage2);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, serverMessage, serverMessage2, reference, reference2);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      EasyMock.verify(consumer, serverMessage2, reference, reference2);
    }
 
-   public void testMultipleConsumersTwoGroups()
+   public void testMultipleConsumersTwoGroups() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -97,19 +163,26 @@
       policy.addConsumer(consumer2);
       policy.addConsumer(consumer3);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
       ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
       EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
-      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer2, policy.select(serverMessage2, false));
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer2, policy.select(serverMessage2, false));
-      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, reference, reference2);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, reference, reference2);
    }
 
-   public void testMultipleConsumersSingleGroupFirstDeliveryFailed()
+   public void testMultipleConsumersSingleGroupFirstDeliveryFailed() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -117,15 +190,18 @@
       policy.addConsumer(consumer2);
       policy.addConsumer(consumer3);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
-      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer2, policy.select(serverMessage, true));
-      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
+      EasyMock.expect(consumer2.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
    }
 
-   public void testMultipleConsumersSingleGroupSecondDeliveryFailed()
+   public void testMultipleConsumersSingleGroupSecondDeliveryFailed() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -133,16 +209,27 @@
       policy.addConsumer(consumer2);
       policy.addConsumer(consumer3);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
-      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(null, policy.select(serverMessage, true));
-      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.BUSY, policy.distribute(reference));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
    }
 
-   public void testMultipleConsumersMultipleGroupMultipleGroupsEach()
+   public void testMultipleConsumersMultipleGroupMultipleGroupsEach() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference3 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference4 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference5 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference6 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference7 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference8 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference9 = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -150,41 +237,64 @@
       policy.addConsumer(consumer2);
       policy.addConsumer(consumer3);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
       ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
       EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
       ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
       EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
       ServerMessage serverMessage4 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference4.getMessage()).andStubReturn(serverMessage4);
       EasyMock.expect(serverMessage4.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid4"));
       ServerMessage serverMessage5 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference5.getMessage()).andStubReturn(serverMessage5);
       EasyMock.expect(serverMessage5.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid5"));
       ServerMessage serverMessage6 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference6.getMessage()).andStubReturn(serverMessage6);
       EasyMock.expect(serverMessage6.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid6"));
       ServerMessage serverMessage7 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference7.getMessage()).andStubReturn(serverMessage7);
       EasyMock.expect(serverMessage7.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid7"));
       ServerMessage serverMessage8 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference8.getMessage()).andStubReturn(serverMessage8);
       EasyMock.expect(serverMessage8.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid8"));
       ServerMessage serverMessage9 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference9.getMessage()).andStubReturn(serverMessage9);
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer3.handle(reference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference4)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference5)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer3.handle(reference6)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference7)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference8)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer3.handle(reference9)).andReturn(HandleStatus.HANDLED);
       EasyMock.expect(serverMessage9.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid9"));
       EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
-                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer2, policy.select(serverMessage2, false));
-      assertEquals(consumer3, policy.select(serverMessage3, false));
-      assertEquals(consumer, policy.select(serverMessage4, false));
-      assertEquals(consumer2, policy.select(serverMessage5, false));
-      assertEquals(consumer3, policy.select(serverMessage6, false));
-      assertEquals(consumer, policy.select(serverMessage7, false));
-      assertEquals(consumer2, policy.select(serverMessage8, false));
-      assertEquals(consumer3, policy.select(serverMessage9, false));
+                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9, reference,
+                      reference2, reference3, reference4, reference5, reference6, reference7, reference8, reference9);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference3));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference4));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference5));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference6));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference7));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference8));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference9));
 
       EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
-                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9, reference,
+                      reference2, reference3, reference4, reference5, reference6, reference7, reference8, reference9);
    }
 
-   public void testMultipleConsumersConsumerRemoved()
+   public void testMultipleConsumersConsumerRemoved() throws Exception
    {
+      MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
+      MessageReference reference3 = EasyMock.createStrictMock(MessageReference.class);
       Consumer consumer = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
       Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -194,18 +304,27 @@
       policy.addConsumer(consumer3);
       policy.addConsumer(consumer4);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
       EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
       ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
       EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
       ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
       EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
-      EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
-      assertEquals(consumer, policy.select(serverMessage, false));
-      assertEquals(consumer2, policy.select(serverMessage2, false));
-      assertEquals(consumer3, policy.select(serverMessage3, false));
+      EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer3.handle(reference3)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(consumer.handle(reference2)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3,
+                      reference, reference2, reference3);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference3));
       policy.removeConsumer(consumer2);
-      assertEquals(consumer, policy.select(serverMessage2, false));
-      EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+      assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+      EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3,
+                      reference, reference2, reference3);
    }
 
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -1444,6 +1444,18 @@
          return null;
       }
 
+      public HandleStatus distribute(MessageReference reference)
+      {
+         try
+         {
+            return consumer.handle(reference);
+         }
+         catch (Exception e)
+         {
+            return HandleStatus.BUSY;
+         }
+      }
+
       public void addConsumer(Consumer consumer)
       {
          this.consumer = consumer;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java	2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java	2008-10-06 09:36:10 UTC (rev 5073)
@@ -24,9 +24,12 @@
 
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.jboss.messaging.tests.util.UnitTestCase;
+import org.easymock.EasyMock;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -43,55 +46,86 @@
 
    public void testNoConsumers()
    {
-      List<Consumer> consumers = new ArrayList<Consumer>();
+      MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
       
       DistributionPolicy dp = new RoundRobinDistributionPolicy();
-      
-      Consumer c = dp.select(null, false);
-      
-      assertEquals(null, c);
+
+      EasyMock.replay(messageReference);
+      HandleStatus status = dp.distribute(messageReference);
+      EasyMock.verify(messageReference);
+      assertEquals(status, HandleStatus.BUSY);
    }
    
-   public void testConsumers()
+   public void testConsumers() throws Exception
    {
-      FakeConsumer c1 = new FakeConsumer();
-      FakeConsumer c2 = new FakeConsumer();
-      FakeConsumer c3 = new FakeConsumer();
+      MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+      Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c3 = EasyMock.createStrictMock(Consumer.class);
       
       DistributionPolicy dp = new RoundRobinDistributionPolicy();
       dp.addConsumer(c1);
       dp.addConsumer(c2);
       dp.addConsumer(c3);
-            
-      Consumer c = null;
+      EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+      EasyMock.replay(messageReference, c1, c2, c3);
       
-      c = dp.select( null, false);
-      
-      assertEquals(c1, c);
-      
-      c = dp.select(null, false);
-      
-      assertEquals(c2, c);
-      
-      c = dp.select(null, false);
-      
-      assertEquals(c3, c);
-      
-      c = dp.select( null, false);
-      
-      assertEquals(c1, c);
-      
-      c = dp.select( null, false);
-      
-      assertEquals(c2, c);
-      
-      c = dp.select( null, false);
-      
-      assertEquals(c3, c);
-      
-      c = dp.select(null, false);
-      
-      assertEquals(c1, c);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      dp.distribute(messageReference);
+      EasyMock.verify(messageReference, c1, c2, c3);
    }
+
+   public void testRunOutOfConsumers() throws Exception
+   {
+      MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+      Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+
+      DistributionPolicy dp = new RoundRobinDistributionPolicy();
+      dp.addConsumer(c1);
+      dp.addConsumer(c2);
+      dp.addConsumer(c3);
+      EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.BUSY);
+      EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.BUSY);
+      EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.BUSY);
+      EasyMock.replay(messageReference, c1, c2, c3);
+
+      HandleStatus status = dp.distribute(messageReference);
+      assertEquals(status, HandleStatus.BUSY);
+      EasyMock.verify(messageReference, c1, c2, c3);
+   }
+   public void testRunOutOfConsumersNoMatch() throws Exception
+   {
+      MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+      Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+
+      DistributionPolicy dp = new RoundRobinDistributionPolicy();
+      dp.addConsumer(c1);
+      dp.addConsumer(c2);
+      dp.addConsumer(c3);
+      EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+      EasyMock.replay(messageReference, c1, c2, c3);
+
+      HandleStatus status = dp.distribute(messageReference);
+      assertEquals(status, HandleStatus.NO_MATCH);
+      EasyMock.verify(messageReference, c1, c2, c3);
+   }
+
    
 }




More information about the jboss-cvs-commits mailing list