[jboss-cvs] JBoss Messaging SVN: r5035 - in trunk: examples/jms and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Sep 29 04:55:29 EDT 2008


Author: ataylor
Date: 2008-09-29 04:55:28 -0400 (Mon, 29 Sep 2008)
New Revision: 5035

Added:
   trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
Modified:
   trunk/build-messaging.xml
   trunk/build.xml
   trunk/examples/jms/build.xml
   trunk/src/config/jbm-jndi.xml
   trunk/src/config/queues.xml
   trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-375 - implemented message grouping functionality

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/build-messaging.xml	2008-09-29 08:55:28 UTC (rev 5035)
@@ -964,6 +964,10 @@
       <ant dir="${examples.dir}/jms" antfile="build.xml" target="wildcardExample"/>
    </target>
 
+    <target name="messageGroupingExample" depends="client-jar">
+      <ant dir="${examples.dir}/jms" antfile="build.xml" target="messageGroupingExample"/>
+   </target>
+
    <target name="SimpleClient" depends="client-jar">
       <ant dir="${examples.dir}/messaging" antfile="build.xml" target="SimpleClient"/>
    </target>

Modified: trunk/build.xml
===================================================================
--- trunk/build.xml	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/build.xml	2008-09-29 08:55:28 UTC (rev 5035)
@@ -194,6 +194,10 @@
       <ant antfile="build-messaging.xml" target="wildcardExample"/>
    </target>
 
+   <target name="messageGroupingExample" depends="createthirdparty">
+      <ant antfile="build-messaging.xml" target="messageGroupingExample"/>
+   </target>
+
    <target name="perfListener" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="perfListener"/>
    </target>

Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/examples/jms/build.xml	2008-09-29 08:55:28 UTC (rev 5035)
@@ -127,6 +127,13 @@
       </java>
    </target>
 
+   <target name="messageGroupingExample" depends="compile"
+           description="-> publish/subscribe example using a topic and a durable subscriber">
+      <java classname="org.jboss.jms.example.MessageGroupingExample" fork="true">
+         <classpath refid="runtime.classpath"/>
+      </java>
+   </target>
+
    <target name="echo-params">
       <echo>
 ***********************************************************************************

Added: trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java	                        (rev 0)
+++ trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.example;
+
+import org.jboss.messaging.core.logging.Logger;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This example sends 20 messages setting the groupid so that a specific consumer will receive each message.
+ * setting the property 'JMSXGroupID' will bind a consumer to the value given, from this point on the same consumer will
+ * receive any message that has the same JMSXGroupID value. setting the property 'JMSXGroupSeq' to 0 will release the
+ * binding after that message has been delivered.  
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessageGroupingExample
+{
+   final static Logger log = Logger.getLogger(MessageGroupingExample.class);
+
+   public static void main(final String[] args)
+   {
+      Connection connection = null;
+
+      try
+      {
+         //create an initial context, env will be picked up from jndi.properties
+         InitialContext initialContext = new InitialContext();
+         Queue queue = (Queue) initialContext.lookup("/queue/testGroupQueue");
+         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+         connection = cf.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(queue);
+         Message[] messages = new Message[20];
+         for (int i = 0; i < messages.length; i++)
+         {
+            if (i < messages.length/2)
+            {
+               messages[i] = session.createTextMessage("This is a text message from groupid1!");
+               messages[i].setStringProperty("JMSXGroupID", "groupid1");
+               if(i == 4)
+               {
+                  messages[i].setIntProperty("JMSXGroupSeq", 0);  
+               }
+            }
+            else
+            {
+               messages[i] = session.createTextMessage("This is a text message from groupid2!");
+               messages[i].setStringProperty("JMSXGroupID", "groupid2");
+            }
+         }
+
+         final CountDownLatch latch = new CountDownLatch(20);
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+         messageConsumer.setMessageListener(new MessageListener()
+         {
+            public void onMessage(Message message)
+            {
+               TextMessage m = (TextMessage) message;
+               try
+               {
+                  log.info("message received for consumer 1 = " + m.getText());
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();
+               }
+               latch.countDown();
+            }
+         });
+         MessageConsumer messageConsumer2 = session.createConsumer(queue);
+         messageConsumer2.setMessageListener(new MessageListener()
+         {
+            public void onMessage(Message message)
+            {
+              TextMessage m = (TextMessage) message;
+               try
+               {
+                  log.info("message received for consumer 2 = " + m.getText());
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();
+               }
+               latch.countDown();
+            }
+         });
+         MessageConsumer messageConsumer3 = session.createConsumer(queue);
+         messageConsumer3.setMessageListener(new MessageListener()
+         {
+            public void onMessage(Message message)
+            {
+              TextMessage m = (TextMessage) message;
+               try
+               {
+                  log.info("message received for consumer 3 = " + m.getText());
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();
+               }
+               latch.countDown();
+            }
+         });
+         connection.start();
+
+         log.info("sending messages to queue");
+         for (Message message : messages)
+         {
+            producer.send(message);
+         }
+         latch.await();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if(connection != null)
+         {
+            try
+            {
+               connection.close();
+            }
+            catch (JMSException e)
+            {
+            }
+         }
+      }
+   }
+}
\ No newline at end of file

Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/config/jbm-jndi.xml	2008-09-29 08:55:28 UTC (rev 5035)
@@ -118,6 +118,9 @@
    <queue name="testQueue">
       <entry name="/queue/testQueue"/>
    </queue>
+   <queue name="testGroupQueue">
+      <entry name="/queue/testGroupQueue"/>
+   </queue>
    <queue name="testPerfQueue">
       <entry name="/queue/testPerfQueue"/>
    </queue>

Modified: trunk/src/config/queues.xml
===================================================================
--- trunk/src/config/queues.xml	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/config/queues.xml	2008-09-29 08:55:28 UTC (rev 5035)
@@ -99,6 +99,13 @@
       <drop-messages-when-full>false</drop-messages-when-full>
    </queue-settings>
 
+   <queue-settings match="queuejms.testGroupQueue">
+      <max-size-bytes>-1</max-size-bytes>
+      <page-size-bytes>10485760</page-size-bytes>
+      <drop-messages-when-full>false</drop-messages-when-full>
+      <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy</distribution-policy-class>
+   </queue-settings>
+
    <!--default for catch all-->
    <queue-settings match="*">
       <clustered>false</clustered>

Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.core.server;
 
-import java.util.List;
-
 /**
  * 
  * A DistributionPolicy
@@ -33,5 +31,13 @@
  */
 public interface DistributionPolicy
 {
-   int select(List<Consumer> consumers, int lastPos);
+   Consumer select(ServerMessage message, boolean redeliver);
+
+   void addConsumer(Consumer consumer);
+
+   boolean removeConsumer(Consumer consumer);
+
+   int getConsumerCount();
+
+   boolean hasConsumers();
 }

Added: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.ServerMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public abstract class DistributionPolicyImpl implements DistributionPolicy
+{
+   protected final List<Consumer> consumers = new ArrayList<Consumer>();
+
+   public abstract Consumer select(ServerMessage message, boolean redeliver);
+
+   public void addConsumer(Consumer consumer)
+   {
+      consumers.add(consumer);
+   }
+
+   public boolean removeConsumer(Consumer consumer)
+   {
+      return consumers.remove(consumer);
+   }
+
+   public int getConsumerCount()
+   {
+      return consumers.size();
+   }
+
+   public boolean hasConsumers()
+   {
+      return !consumers.isEmpty();
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Distributes message based on the message property 'JMSXGroupID'. Once a message has been successfully delivered to a
+ * consumer that consumer is then bound to that group. Any message that has the same group id set will always be
+ * delivered to the same consumer. This sequence is broken only when a message with the same group id has the property
+ * 'JMSXGroupSeq' set to 0 or if the consumer is removed, that is it is not passed down in the select consumers list.
+ * The Initial consumer is th efirst consumer found, using the round robin policy, that hasn't been bound to a group, If
+ * there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
+ * so on.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingRoundRobinDistributionPolicy extends RoundRobinDistributionPolicy
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(GroupingRoundRobinDistributionPolicy.class);
+
+
+   // for convenience, the Group ID is directly mapped to the
+   // JMS JMSXGroupID & JMSXGroupSeq header names.
+   // It does not imply any dependency on JMS whatsoever
+   public static final SimpleString GROUP_ID = new SimpleString("JMSXGroupID");
+
+   public static final SimpleString GROUP_SEQ = new SimpleString("JMSXGroupSeq");
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Map with GroupID as a key and a Consumer as value.
+   private Map<SimpleString, ConsumerState> consumers = new ConcurrentHashMap<SimpleString, ConsumerState>();
+
+   //we hold the state of each consumer, i.e., is it bound etc
+   private Map<Consumer, ConsumerState> consumerStateMap = new ConcurrentHashMap<Consumer, ConsumerState>();
+   // Distributor implementation ------------------------------------
+
+   public Consumer select(ServerMessage message, boolean redeliver)
+   {
+      if (message.getProperty(GROUP_ID) != null)
+      {
+         SimpleString groupId = (SimpleString) message.getProperty(GROUP_ID);
+         Integer groupSeq = (Integer) message.getProperty(GROUP_SEQ);
+         if (consumers.get(groupId) != null)
+         {
+            ConsumerState consumerState = consumers.get(groupId);
+            //if this is a redelivery and the group is bound we wait.
+            if(redeliver && consumerState.isBound())
+            {
+               return null;
+            }
+            //if we need to reset which consumer to use, this will take play from the next invocation with the same groupid.
+            if (groupSeq != null && groupSeq.equals(0))
+            {
+               removeBinding(groupId, consumerState);
+            }
+            //if this is a redelivery and it was its first attempt we can look for another consumer and use that
+            else if(redeliver && !consumerState.isBound())
+            {
+               removeBinding(groupId, consumerState);
+               return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+            }
+            //we bind after we know that the first message has been successfully consumed
+            else if(!consumerState.isBound())
+            {
+               consumerState.setBound(true);
+            }
+            consumerState.setAvailable(false);
+
+            return consumerState.getConsumer();
+         }
+         else
+         {
+            return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+         }
+      }
+      else
+      {
+         return super.select(message, redeliver);
+      }
+   }
+
+   public synchronized void addConsumer(Consumer consumer)
+   {
+      super.addConsumer(consumer);
+      ConsumerState cs = new ConsumerState(consumer);
+      consumerStateMap.put(consumer, cs);
+   }
+
+   public synchronized boolean removeConsumer(Consumer consumer)
+   {
+      boolean removed = super.removeConsumer(consumer);
+      if(removed)
+      {
+         ConsumerState cs = consumerStateMap.remove(consumer);
+         for (SimpleString ss : cs.getGroupIds())
+         {
+            consumers.remove(ss);
+         }
+
+      }
+      return removed;
+   }
+
+   /**
+    * we need to find the next available consumer that doesn't have a binding. If there are no free we use the next
+    * available in the normal Round Robin fashion.
+    * @param message
+    * @param redeliver
+    * @param groupId
+    * @return
+    */
+   private ConsumerState getNextPositionAndBind(ServerMessage message, boolean redeliver, SimpleString groupId)
+   {
+      Consumer consumer = super.select(message, redeliver);
+      ConsumerState cs = consumerStateMap.get(consumer);
+      //if there is only one return it
+      if(getConsumerCount() == 1 || cs.isAvailable())
+      {
+         consumers.put(groupId, cs);
+         cs.getGroupIds().add(groupId);
+         return cs;
+      }
+      else
+      {
+         consumer = super.select(message, redeliver);
+         ConsumerState ncs = consumerStateMap.get(consumer);
+         while(!ncs.isAvailable())
+         {
+            consumer = super.select(message, redeliver);
+            ncs = consumerStateMap.get(consumer);
+            if(ncs == cs)
+            {
+               cs.getGroupIds().add(groupId);
+               return cs;
+            }
+         }
+         ncs.getGroupIds().add(groupId);
+         return ncs;
+      }
+   }
+
+   private void removeBinding(SimpleString groupId, ConsumerState consumerState)
+   {
+      consumerState.setAvailable(true);
+      consumerState.getGroupIds().remove(groupId);
+      consumers.remove(groupId);
+   }
+
+   /**
+    * holds the current state of a consumer, is it available, what groups it is bound to etc.
+    */
+   class ConsumerState
+   {
+      private final Consumer consumer;
+      private volatile boolean isBound = false;
+      private volatile boolean available = true;
+      private List<SimpleString> groupIds = new ArrayList<SimpleString>();
+
+      public ConsumerState(Consumer consumer)
+      {
+         this.consumer = consumer;
+      }
+
+      public boolean isBound()
+      {
+         return isBound;
+      }
+
+      public void setBound(boolean bound)
+      {
+         isBound = bound;
+      }
+
+
+      public boolean isAvailable()
+      {
+         return available;
+      }
+
+      public void setAvailable(boolean available)
+      {
+         this.available = available;
+      }
+
+      public Consumer getConsumer()
+      {
+         return consumer;
+      }
+
+      public List<SimpleString> getGroupIds()
+      {
+         return groupIds;
+      }
+
+      public boolean equals(Object o)
+      {
+         if (this == o)
+         {
+            return true;
+         }
+         if (o == null || getClass() != o.getClass())
+         {
+            return false;
+         }
+
+         ConsumerState that = (ConsumerState) o;
+
+         if (!consumer.equals(that.consumer))
+         {
+            return false;
+         }
+
+         return true;
+      }
+
+      public int hashCode()
+      {
+         return consumer.hashCode();
+      }
+
+
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -12,20 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -34,18 +20,22 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
  * 
@@ -80,7 +70,7 @@
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
-   private final List<Consumer> consumers = new ArrayList<Consumer>();
+   //private final List<Consumer> consumers = new ArrayList<Consumer>();
 
    private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
 
@@ -90,8 +80,10 @@
 
    private boolean promptDelivery;
 
-   private int pos;
+   //private int pos = -1;
 
+   private Consumer currentConsumer = null;
+
    private AtomicInteger sizeBytes = new AtomicInteger(0);
 
    private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -270,19 +262,18 @@
 
    public synchronized void addConsumer(final Consumer consumer)
    {
-      consumers.add(consumer);
+      distributionPolicy.addConsumer(consumer);
    }
 
    public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
    {
-      boolean removed = consumers.remove(consumer);
-
-      if (pos == consumers.size())
+      boolean removed = distributionPolicy.removeConsumer(consumer);
+      if(removed)
       {
-         pos = 0;
+         distributionPolicy.removeConsumer(consumer);
       }
 
-      if (consumers.isEmpty())
+      if (!distributionPolicy.hasConsumers())
       {
          promptDelivery = false;
       }
@@ -292,7 +283,7 @@
 
    public synchronized int getConsumerCount()
    {
-      return consumers.size();
+      return distributionPolicy.getConsumerCount();
    }
 
    public synchronized List<MessageReference> list(final Filter filter)
@@ -747,23 +738,30 @@
 
    private HandleStatus deliver(final MessageReference reference)
    {
-      if (consumers.isEmpty())
+      if (!distributionPolicy.hasConsumers())
       {
          return HandleStatus.BUSY;
       }
 
-      int startPos = pos;
-
       boolean filterRejected = false;
 
-      while (true)
+      HandleStatus status = null;
+      int pos = 0;
+      while (pos <= distributionPolicy.getConsumerCount())
       {
-         Consumer consumer = consumers.get(pos);
-
-         pos = distributionPolicy.select(consumers, pos);
-
-         HandleStatus status;
-
+         Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
+         if(consumer == null)
+         {
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }
+         }
          try
          {
             status = consumer.handle(reference);
@@ -805,21 +803,18 @@
 
             filterRejected = true;
          }
-
-         if (pos == startPos)
-         {
-            // Tried all of them
-            if (filterRejected)
-            {
-               return HandleStatus.NO_MATCH;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               return HandleStatus.BUSY;
-            }
-         }
+         pos++;
       }
+      // Tried all of them
+      if (filterRejected)
+      {
+         return HandleStatus.NO_MATCH;
+      }
+      else
+      {
+         // Give up - all consumers busy
+         return HandleStatus.BUSY;
+      }
    }
 
    // Inner classes

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,10 +22,8 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.List;
-
 import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.ServerMessage;
 
 /**
  * 
@@ -34,10 +32,16 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public class RoundRobinDistributionPolicy implements DistributionPolicy
-{   
-   public int select(final List<Consumer> consumers, int pos)
+public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
+{
+   int pos = -1;
+
+   public Consumer select(ServerMessage message, boolean redeliver)
    {     
+      if(consumers.isEmpty())
+      {
+         return null;
+      }
       if (pos == -1)
       {
          //First time
@@ -52,7 +56,7 @@
             pos = 0;
          }
       }
-      
-      return pos;
+
+      return consumers.get(pos);
    }
 }

Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,260 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingRoundRobinDistributionPolicyTest extends UnitTestCase
+{
+   GroupingRoundRobinDistributionPolicy policy = null;
+
+   protected void setUp() throws Exception
+   {
+      policy = new GroupingRoundRobinDistributionPolicy();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      policy = null;
+   }
+
+   public void testSingleConsumerSingleGroup()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, serverMessage);
+      assertEquals(consumer, policy.select( serverMessage, false));
+      assertEquals(consumer, policy.select(serverMessage, false));
+      EasyMock.verify(consumer, serverMessage);
+   }
+
+   public void testMultipleConsumersSingleGroup()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer, policy.select(serverMessage, false));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+   }
+
+   public void testSingleConsumerTwoGroups()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, serverMessage, serverMessage2);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer, policy.select(serverMessage2, false));
+      EasyMock.verify(consumer, serverMessage2);
+   }
+
+   public void testMultipleConsumersTwoGroups()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+   }
+
+   public void testMultipleConsumersSingleGroupFirstDeliveryFailed()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage, true));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+   }
+
+   public void testMultipleConsumersSingleGroupSecondDeliveryFailed()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(null, policy.select(serverMessage, true));
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+   }
+
+   public void testMultipleConsumersMultipleGroupMultipleGroupsEach()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage4 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage4.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid4"));
+      EasyMock.expect(serverMessage4.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage5 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage5.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid5"));
+      EasyMock.expect(serverMessage5.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage6 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage6.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid6"));
+      EasyMock.expect(serverMessage6.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage7 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage7.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid7"));
+      EasyMock.expect(serverMessage7.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage8 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage8.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid8"));
+      EasyMock.expect(serverMessage8.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage9 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage9.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid9"));
+      EasyMock.expect(serverMessage9.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
+                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      assertEquals(consumer3, policy.select(serverMessage3, false));
+      assertEquals(consumer, policy.select(serverMessage4, false));
+      assertEquals(consumer2, policy.select(serverMessage5, false));
+      assertEquals(consumer3, policy.select(serverMessage6, false));
+      assertEquals(consumer, policy.select(serverMessage7, false));
+      assertEquals(consumer2, policy.select(serverMessage8, false));
+      assertEquals(consumer3, policy.select(serverMessage9, false));
+
+      EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
+                      serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+   }
+
+   public void testMultipleConsumersConsumerRemoved()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer4 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      policy.addConsumer(consumer4);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      assertEquals(consumer3, policy.select(serverMessage3, false));
+      policy.removeConsumer(consumer2);
+      assertEquals(consumer, policy.select(serverMessage2, false));
+      EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+   }
+
+   public void testMultipleConsumersResetReceived()
+   {
+      Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+      Consumer consumer4 = EasyMock.createStrictMock(Consumer.class);
+      policy.addConsumer(consumer);
+      policy.addConsumer(consumer2);
+      policy.addConsumer(consumer3);
+      policy.addConsumer(consumer4);
+      ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+      EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(null);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(0);
+      EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(null);
+      ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+      EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+      EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+      assertEquals(consumer, policy.select(serverMessage, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      assertEquals(consumer3, policy.select(serverMessage3, false));
+      assertEquals(consumer2, policy.select(serverMessage2, false));
+      assertEquals(consumer4, policy.select(serverMessage2, false));
+      EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -21,45 +21,33 @@
  */
 
 package org.jboss.messaging.tests.unit.core.server.impl;
- 
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.server.impl.QueueImpl;
 import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeFilter;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * A QueueTest
  *
@@ -1450,10 +1438,31 @@
 
    class DummyDistributionPolicy implements DistributionPolicy
    {
-      public int select(List<Consumer> consumers, int lastPos)
+      Consumer consumer;
+      public Consumer select(ServerMessage message, boolean redeliver)
       {
-         return 0;
+         return null;
       }
+
+      public void addConsumer(Consumer consumer)
+      {
+         this.consumer = consumer;
+      }
+
+      public boolean removeConsumer(Consumer consumer)
+      {
+         return false;  //To change body of implemented methods use File | Settings | File Templates.
+      }
+
+      public int getConsumerCount()
+      {
+         return 0;  //To change body of implemented methods use File | Settings | File Templates.
+      }
+
+      public boolean hasConsumers()
+      {
+         return false;  //To change body of implemented methods use File | Settings | File Templates.
+      }
    }
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,15 +22,15 @@
 
 package org.jboss.messaging.tests.unit.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
 import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.jboss.messaging.tests.util.UnitTestCase;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  * A RoundRobinDistributionPolicyTest
@@ -47,50 +47,51 @@
       
       DistributionPolicy dp = new RoundRobinDistributionPolicy();
       
-      int pos = dp.select(consumers, -1);
+      Consumer c = dp.select(null, false);
       
-      assertEquals(0, pos);
+      assertEquals(null, c);
    }
    
    public void testConsumers()
    {
-      List<Consumer> consumers = new ArrayList<Consumer>();
+      FakeConsumer c1 = new FakeConsumer();
+      FakeConsumer c2 = new FakeConsumer();
+      FakeConsumer c3 = new FakeConsumer();
       
-      consumers.add(new FakeConsumer());
-      consumers.add(new FakeConsumer());
-      consumers.add(new FakeConsumer());
-      
       DistributionPolicy dp = new RoundRobinDistributionPolicy();
+      dp.addConsumer(c1);
+      dp.addConsumer(c2);
+      dp.addConsumer(c3);
             
-      int pos = -1;
+      Consumer c = null;
       
-      pos = dp.select(consumers, pos);
+      c = dp.select( null, false);
       
-      assertEquals(0, pos);
+      assertEquals(c1, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select(null, false);
       
-      assertEquals(1, pos);
+      assertEquals(c2, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select(null, false);
       
-      assertEquals(2, pos);
+      assertEquals(c3, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select( null, false);
       
-      assertEquals(0, pos);
+      assertEquals(c1, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select( null, false);
       
-      assertEquals(1, pos);
+      assertEquals(c2, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select( null, false);
       
-      assertEquals(2, pos);
+      assertEquals(c3, c);
       
-      pos = dp.select(consumers, pos);
+      c = dp.select(null, false);
       
-      assertEquals(0, pos);
+      assertEquals(c1, c);
    }
    
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -948,7 +948,7 @@
 //      Xid xid = createStrictMock(Xid.class);
 //      expect(rm.getTransaction(xid)).andReturn(tx);
 //      expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-//      expect(tx.isEmpty()).andReturn(false);
+//      expect(tx.hasConsumers()).andReturn(false);
 //      tx.prepare();
 //      replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
 //      SessionXAResponseMessage message = session.XAPrepare(xid);
@@ -963,7 +963,7 @@
 //      Xid xid = createStrictMock(Xid.class);
 //      expect(rm.getTransaction(xid)).andReturn(tx);
 //      expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-//      expect(tx.isEmpty()).andReturn(true);
+//      expect(tx.hasConsumers()).andReturn(true);
 //      expect(rm.removeTransaction(xid)).andReturn(true);
 //      replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
 //      SessionXAResponseMessage message = session.XAPrepare(xid);
@@ -978,7 +978,7 @@
 //      Xid xid = createStrictMock(Xid.class);
 //      expect(rm.getTransaction(xid)).andReturn(tx);
 //      expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-//      expect(tx.isEmpty()).andReturn(true);
+//      expect(tx.hasConsumers()).andReturn(true);
 //      expect(rm.removeTransaction(xid)).andReturn(false);
 //      replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
 //      SessionXAResponseMessage message = session.XAPrepare(xid);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,13 +22,6 @@
 
 package org.jboss.messaging.tests.unit.core.transaction.impl;
 
-import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.transaction.xa.Xid;
-
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -41,9 +34,14 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
+import javax.transaction.xa.Xid;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * 
  * A TransactionImplTest
@@ -453,7 +451,7 @@
 //            
 //      Transaction tx = new TransactionImpl(sm, po);
 //      
-//      assertTrue(tx.isEmpty());
+//      assertTrue(tx.hasConsumers());
 //      assertFalse(tx.isContainsPersistent());
 //
 //      EasyMock.verify(sm);
@@ -480,7 +478,7 @@
 //      
 //      tx.addMessage(address1, message1);
 //      
-//      assertFalse(tx.isEmpty());
+//      assertFalse(tx.hasConsumers());
 //      assertTrue(tx.isContainsPersistent());
 //      
 //         




More information about the jboss-cvs-commits mailing list