[jboss-cvs] JBoss Messaging SVN: r5035 - in trunk: examples/jms and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 29 04:55:29 EDT 2008
Author: ataylor
Date: 2008-09-29 04:55:28 -0400 (Mon, 29 Sep 2008)
New Revision: 5035
Added:
trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java
trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
Modified:
trunk/build-messaging.xml
trunk/build.xml
trunk/examples/jms/build.xml
trunk/src/config/jbm-jndi.xml
trunk/src/config/queues.xml
trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-375 - implemented message grouping functionality
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/build-messaging.xml 2008-09-29 08:55:28 UTC (rev 5035)
@@ -964,6 +964,10 @@
<ant dir="${examples.dir}/jms" antfile="build.xml" target="wildcardExample"/>
</target>
+ <target name="messageGroupingExample" depends="client-jar">
+ <ant dir="${examples.dir}/jms" antfile="build.xml" target="messageGroupingExample"/>
+ </target>
+
<target name="SimpleClient" depends="client-jar">
<ant dir="${examples.dir}/messaging" antfile="build.xml" target="SimpleClient"/>
</target>
Modified: trunk/build.xml
===================================================================
--- trunk/build.xml 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/build.xml 2008-09-29 08:55:28 UTC (rev 5035)
@@ -194,6 +194,10 @@
<ant antfile="build-messaging.xml" target="wildcardExample"/>
</target>
+ <target name="messageGroupingExample" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="messageGroupingExample"/>
+ </target>
+
<target name="perfListener" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="perfListener"/>
</target>
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/examples/jms/build.xml 2008-09-29 08:55:28 UTC (rev 5035)
@@ -127,6 +127,13 @@
</java>
</target>
+ <target name="messageGroupingExample" depends="compile"
+ description="-> publish/subscribe example using a topic and a durable subscriber">
+ <java classname="org.jboss.jms.example.MessageGroupingExample" fork="true">
+ <classpath refid="runtime.classpath"/>
+ </java>
+ </target>
+
<target name="echo-params">
<echo>
***********************************************************************************
Added: trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java (rev 0)
+++ trunk/examples/jms/src/org/jboss/jms/example/MessageGroupingExample.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.example;
+
+import org.jboss.messaging.core.logging.Logger;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This example sends 20 messages setting the groupid so that a specific consumer will receive each message.
+ * setting the property 'JMSXGroupID' will bind a consumer to the value given, from this point on the same consumer will
+ * receive any message that has the same JMSXGroupID value. setting the property 'JMSXGroupSeq' to 0 will release the
+ * binding after that message has been delivered.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessageGroupingExample
+{
+ final static Logger log = Logger.getLogger(MessageGroupingExample.class);
+
+ public static void main(final String[] args)
+ {
+ Connection connection = null;
+
+ try
+ {
+ //create an initial context, env will be picked up from jndi.properties
+ InitialContext initialContext = new InitialContext();
+ Queue queue = (Queue) initialContext.lookup("/queue/testGroupQueue");
+ ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ Message[] messages = new Message[20];
+ for (int i = 0; i < messages.length; i++)
+ {
+ if (i < messages.length/2)
+ {
+ messages[i] = session.createTextMessage("This is a text message from groupid1!");
+ messages[i].setStringProperty("JMSXGroupID", "groupid1");
+ if(i == 4)
+ {
+ messages[i].setIntProperty("JMSXGroupSeq", 0);
+ }
+ }
+ else
+ {
+ messages[i] = session.createTextMessage("This is a text message from groupid2!");
+ messages[i].setStringProperty("JMSXGroupID", "groupid2");
+ }
+ }
+
+ final CountDownLatch latch = new CountDownLatch(20);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ TextMessage m = (TextMessage) message;
+ try
+ {
+ log.info("message received for consumer 1 = " + m.getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }
+ });
+ MessageConsumer messageConsumer2 = session.createConsumer(queue);
+ messageConsumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ TextMessage m = (TextMessage) message;
+ try
+ {
+ log.info("message received for consumer 2 = " + m.getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }
+ });
+ MessageConsumer messageConsumer3 = session.createConsumer(queue);
+ messageConsumer3.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ TextMessage m = (TextMessage) message;
+ try
+ {
+ log.info("message received for consumer 3 = " + m.getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }
+ });
+ connection.start();
+
+ log.info("sending messages to queue");
+ for (Message message : messages)
+ {
+ producer.send(message);
+ }
+ latch.await();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if(connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/config/jbm-jndi.xml 2008-09-29 08:55:28 UTC (rev 5035)
@@ -118,6 +118,9 @@
<queue name="testQueue">
<entry name="/queue/testQueue"/>
</queue>
+ <queue name="testGroupQueue">
+ <entry name="/queue/testGroupQueue"/>
+ </queue>
<queue name="testPerfQueue">
<entry name="/queue/testPerfQueue"/>
</queue>
Modified: trunk/src/config/queues.xml
===================================================================
--- trunk/src/config/queues.xml 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/config/queues.xml 2008-09-29 08:55:28 UTC (rev 5035)
@@ -99,6 +99,13 @@
<drop-messages-when-full>false</drop-messages-when-full>
</queue-settings>
+ <queue-settings match="queuejms.testGroupQueue">
+ <max-size-bytes>-1</max-size-bytes>
+ <page-size-bytes>10485760</page-size-bytes>
+ <drop-messages-when-full>false</drop-messages-when-full>
+ <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy</distribution-policy-class>
+ </queue-settings>
+
<!--default for catch all-->
<queue-settings match="*">
<clustered>false</clustered>
Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,8 +22,6 @@
package org.jboss.messaging.core.server;
-import java.util.List;
-
/**
*
* A DistributionPolicy
@@ -33,5 +31,13 @@
*/
public interface DistributionPolicy
{
- int select(List<Consumer> consumers, int lastPos);
+ Consumer select(ServerMessage message, boolean redeliver);
+
+ void addConsumer(Consumer consumer);
+
+ boolean removeConsumer(Consumer consumer);
+
+ int getConsumerCount();
+
+ boolean hasConsumers();
}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.ServerMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public abstract class DistributionPolicyImpl implements DistributionPolicy
+{
+ protected final List<Consumer> consumers = new ArrayList<Consumer>();
+
+ public abstract Consumer select(ServerMessage message, boolean redeliver);
+
+ public void addConsumer(Consumer consumer)
+ {
+ consumers.add(consumer);
+ }
+
+ public boolean removeConsumer(Consumer consumer)
+ {
+ return consumers.remove(consumer);
+ }
+
+ public int getConsumerCount()
+ {
+ return consumers.size();
+ }
+
+ public boolean hasConsumers()
+ {
+ return !consumers.isEmpty();
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Distributes message based on the message property 'JMSXGroupID'. Once a message has been successfully delivered to a
+ * consumer that consumer is then bound to that group. Any message that has the same group id set will always be
+ * delivered to the same consumer. This sequence is broken only when a message with the same group id has the property
+ * 'JMSXGroupSeq' set to 0 or if the consumer is removed, that is it is not passed down in the select consumers list.
+ * The Initial consumer is th efirst consumer found, using the round robin policy, that hasn't been bound to a group, If
+ * there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
+ * so on.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingRoundRobinDistributionPolicy extends RoundRobinDistributionPolicy
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(GroupingRoundRobinDistributionPolicy.class);
+
+
+ // for convenience, the Group ID is directly mapped to the
+ // JMS JMSXGroupID & JMSXGroupSeq header names.
+ // It does not imply any dependency on JMS whatsoever
+ public static final SimpleString GROUP_ID = new SimpleString("JMSXGroupID");
+
+ public static final SimpleString GROUP_SEQ = new SimpleString("JMSXGroupSeq");
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Map with GroupID as a key and a Consumer as value.
+ private Map<SimpleString, ConsumerState> consumers = new ConcurrentHashMap<SimpleString, ConsumerState>();
+
+ //we hold the state of each consumer, i.e., is it bound etc
+ private Map<Consumer, ConsumerState> consumerStateMap = new ConcurrentHashMap<Consumer, ConsumerState>();
+ // Distributor implementation ------------------------------------
+
+ public Consumer select(ServerMessage message, boolean redeliver)
+ {
+ if (message.getProperty(GROUP_ID) != null)
+ {
+ SimpleString groupId = (SimpleString) message.getProperty(GROUP_ID);
+ Integer groupSeq = (Integer) message.getProperty(GROUP_SEQ);
+ if (consumers.get(groupId) != null)
+ {
+ ConsumerState consumerState = consumers.get(groupId);
+ //if this is a redelivery and the group is bound we wait.
+ if(redeliver && consumerState.isBound())
+ {
+ return null;
+ }
+ //if we need to reset which consumer to use, this will take play from the next invocation with the same groupid.
+ if (groupSeq != null && groupSeq.equals(0))
+ {
+ removeBinding(groupId, consumerState);
+ }
+ //if this is a redelivery and it was its first attempt we can look for another consumer and use that
+ else if(redeliver && !consumerState.isBound())
+ {
+ removeBinding(groupId, consumerState);
+ return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+ }
+ //we bind after we know that the first message has been successfully consumed
+ else if(!consumerState.isBound())
+ {
+ consumerState.setBound(true);
+ }
+ consumerState.setAvailable(false);
+
+ return consumerState.getConsumer();
+ }
+ else
+ {
+ return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+ }
+ }
+ else
+ {
+ return super.select(message, redeliver);
+ }
+ }
+
+ public synchronized void addConsumer(Consumer consumer)
+ {
+ super.addConsumer(consumer);
+ ConsumerState cs = new ConsumerState(consumer);
+ consumerStateMap.put(consumer, cs);
+ }
+
+ public synchronized boolean removeConsumer(Consumer consumer)
+ {
+ boolean removed = super.removeConsumer(consumer);
+ if(removed)
+ {
+ ConsumerState cs = consumerStateMap.remove(consumer);
+ for (SimpleString ss : cs.getGroupIds())
+ {
+ consumers.remove(ss);
+ }
+
+ }
+ return removed;
+ }
+
+ /**
+ * we need to find the next available consumer that doesn't have a binding. If there are no free we use the next
+ * available in the normal Round Robin fashion.
+ * @param message
+ * @param redeliver
+ * @param groupId
+ * @return
+ */
+ private ConsumerState getNextPositionAndBind(ServerMessage message, boolean redeliver, SimpleString groupId)
+ {
+ Consumer consumer = super.select(message, redeliver);
+ ConsumerState cs = consumerStateMap.get(consumer);
+ //if there is only one return it
+ if(getConsumerCount() == 1 || cs.isAvailable())
+ {
+ consumers.put(groupId, cs);
+ cs.getGroupIds().add(groupId);
+ return cs;
+ }
+ else
+ {
+ consumer = super.select(message, redeliver);
+ ConsumerState ncs = consumerStateMap.get(consumer);
+ while(!ncs.isAvailable())
+ {
+ consumer = super.select(message, redeliver);
+ ncs = consumerStateMap.get(consumer);
+ if(ncs == cs)
+ {
+ cs.getGroupIds().add(groupId);
+ return cs;
+ }
+ }
+ ncs.getGroupIds().add(groupId);
+ return ncs;
+ }
+ }
+
+ private void removeBinding(SimpleString groupId, ConsumerState consumerState)
+ {
+ consumerState.setAvailable(true);
+ consumerState.getGroupIds().remove(groupId);
+ consumers.remove(groupId);
+ }
+
+ /**
+ * holds the current state of a consumer, is it available, what groups it is bound to etc.
+ */
+ class ConsumerState
+ {
+ private final Consumer consumer;
+ private volatile boolean isBound = false;
+ private volatile boolean available = true;
+ private List<SimpleString> groupIds = new ArrayList<SimpleString>();
+
+ public ConsumerState(Consumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ public boolean isBound()
+ {
+ return isBound;
+ }
+
+ public void setBound(boolean bound)
+ {
+ isBound = bound;
+ }
+
+
+ public boolean isAvailable()
+ {
+ return available;
+ }
+
+ public void setAvailable(boolean available)
+ {
+ this.available = available;
+ }
+
+ public Consumer getConsumer()
+ {
+ return consumer;
+ }
+
+ public List<SimpleString> getGroupIds()
+ {
+ return groupIds;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ConsumerState that = (ConsumerState) o;
+
+ if (!consumer.equals(that.consumer))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ return consumer.hashCode();
+ }
+
+
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -12,20 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -34,18 +20,22 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
*
@@ -80,7 +70,7 @@
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
- private final List<Consumer> consumers = new ArrayList<Consumer>();
+ //private final List<Consumer> consumers = new ArrayList<Consumer>();
private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
@@ -90,8 +80,10 @@
private boolean promptDelivery;
- private int pos;
+ //private int pos = -1;
+ private Consumer currentConsumer = null;
+
private AtomicInteger sizeBytes = new AtomicInteger(0);
private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -270,19 +262,18 @@
public synchronized void addConsumer(final Consumer consumer)
{
- consumers.add(consumer);
+ distributionPolicy.addConsumer(consumer);
}
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
- boolean removed = consumers.remove(consumer);
-
- if (pos == consumers.size())
+ boolean removed = distributionPolicy.removeConsumer(consumer);
+ if(removed)
{
- pos = 0;
+ distributionPolicy.removeConsumer(consumer);
}
- if (consumers.isEmpty())
+ if (!distributionPolicy.hasConsumers())
{
promptDelivery = false;
}
@@ -292,7 +283,7 @@
public synchronized int getConsumerCount()
{
- return consumers.size();
+ return distributionPolicy.getConsumerCount();
}
public synchronized List<MessageReference> list(final Filter filter)
@@ -747,23 +738,30 @@
private HandleStatus deliver(final MessageReference reference)
{
- if (consumers.isEmpty())
+ if (!distributionPolicy.hasConsumers())
{
return HandleStatus.BUSY;
}
- int startPos = pos;
-
boolean filterRejected = false;
- while (true)
+ HandleStatus status = null;
+ int pos = 0;
+ while (pos <= distributionPolicy.getConsumerCount())
{
- Consumer consumer = consumers.get(pos);
-
- pos = distributionPolicy.select(consumers, pos);
-
- HandleStatus status;
-
+ Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
+ if(consumer == null)
+ {
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
+ }
try
{
status = consumer.handle(reference);
@@ -805,21 +803,18 @@
filterRejected = true;
}
-
- if (pos == startPos)
- {
- // Tried all of them
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
- }
+ pos++;
}
+ // Tried all of them
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
}
// Inner classes
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,10 +22,8 @@
package org.jboss.messaging.core.server.impl;
-import java.util.List;
-
import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.ServerMessage;
/**
*
@@ -34,10 +32,16 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public class RoundRobinDistributionPolicy implements DistributionPolicy
-{
- public int select(final List<Consumer> consumers, int pos)
+public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
+{
+ int pos = -1;
+
+ public Consumer select(ServerMessage message, boolean redeliver)
{
+ if(consumers.isEmpty())
+ {
+ return null;
+ }
if (pos == -1)
{
//First time
@@ -52,7 +56,7 @@
pos = 0;
}
}
-
- return pos;
+
+ return consumers.get(pos);
}
}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -0,0 +1,260 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupingRoundRobinDistributionPolicyTest extends UnitTestCase
+{
+ GroupingRoundRobinDistributionPolicy policy = null;
+
+ protected void setUp() throws Exception
+ {
+ policy = new GroupingRoundRobinDistributionPolicy();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ policy = null;
+ }
+
+ public void testSingleConsumerSingleGroup()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, serverMessage);
+ assertEquals(consumer, policy.select( serverMessage, false));
+ assertEquals(consumer, policy.select(serverMessage, false));
+ EasyMock.verify(consumer, serverMessage);
+ }
+
+ public void testMultipleConsumersSingleGroup()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer, policy.select(serverMessage, false));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ }
+
+ public void testSingleConsumerTwoGroups()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, serverMessage, serverMessage2);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer, policy.select(serverMessage2, false));
+ EasyMock.verify(consumer, serverMessage2);
+ }
+
+ public void testMultipleConsumersTwoGroups()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+ }
+
+ public void testMultipleConsumersSingleGroupFirstDeliveryFailed()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage, true));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ }
+
+ public void testMultipleConsumersSingleGroupSecondDeliveryFailed()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(null, policy.select(serverMessage, true));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ }
+
+ public void testMultipleConsumersMultipleGroupMultipleGroupsEach()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage4 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage4.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid4"));
+ EasyMock.expect(serverMessage4.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage5 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage5.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid5"));
+ EasyMock.expect(serverMessage5.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage6 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage6.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid6"));
+ EasyMock.expect(serverMessage6.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage7 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage7.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid7"));
+ EasyMock.expect(serverMessage7.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage8 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage8.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid8"));
+ EasyMock.expect(serverMessage8.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage9 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage9.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid9"));
+ EasyMock.expect(serverMessage9.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
+ serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ assertEquals(consumer3, policy.select(serverMessage3, false));
+ assertEquals(consumer, policy.select(serverMessage4, false));
+ assertEquals(consumer2, policy.select(serverMessage5, false));
+ assertEquals(consumer3, policy.select(serverMessage6, false));
+ assertEquals(consumer, policy.select(serverMessage7, false));
+ assertEquals(consumer2, policy.select(serverMessage8, false));
+ assertEquals(consumer3, policy.select(serverMessage9, false));
+
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
+ serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+ }
+
+ public void testMultipleConsumersConsumerRemoved()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer4 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ policy.addConsumer(consumer4);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ assertEquals(consumer3, policy.select(serverMessage3, false));
+ policy.removeConsumer(consumer2);
+ assertEquals(consumer, policy.select(serverMessage2, false));
+ EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+ }
+
+ public void testMultipleConsumersResetReceived()
+ {
+ Consumer consumer = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
+ Consumer consumer4 = EasyMock.createStrictMock(Consumer.class);
+ policy.addConsumer(consumer);
+ policy.addConsumer(consumer2);
+ policy.addConsumer(consumer3);
+ policy.addConsumer(consumer4);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(null);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(0);
+ EasyMock.expect(serverMessage2.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andReturn(null);
+ ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+ EasyMock.expect(serverMessage3.getProperty(GroupingRoundRobinDistributionPolicy.GROUP_SEQ)).andStubReturn(null);
+ EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+ assertEquals(consumer, policy.select(serverMessage, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ assertEquals(consumer3, policy.select(serverMessage3, false));
+ assertEquals(consumer2, policy.select(serverMessage2, false));
+ assertEquals(consumer4, policy.select(serverMessage2, false));
+ EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -21,45 +21,33 @@
*/
package org.jboss.messaging.tests.unit.core.server.impl;
-
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.server.impl.QueueImpl;
import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeFilter;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
* A QueueTest
*
@@ -1450,10 +1438,31 @@
class DummyDistributionPolicy implements DistributionPolicy
{
- public int select(List<Consumer> consumers, int lastPos)
+ Consumer consumer;
+ public Consumer select(ServerMessage message, boolean redeliver)
{
- return 0;
+ return null;
}
+
+ public void addConsumer(Consumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ public boolean removeConsumer(Consumer consumer)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getConsumerCount()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean hasConsumers()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,15 +22,15 @@
package org.jboss.messaging.tests.unit.core.server.impl;
-import java.util.ArrayList;
-import java.util.List;
-
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.jboss.messaging.tests.util.UnitTestCase;
+import java.util.ArrayList;
+import java.util.List;
+
/**
*
* A RoundRobinDistributionPolicyTest
@@ -47,50 +47,51 @@
DistributionPolicy dp = new RoundRobinDistributionPolicy();
- int pos = dp.select(consumers, -1);
+ Consumer c = dp.select(null, false);
- assertEquals(0, pos);
+ assertEquals(null, c);
}
public void testConsumers()
{
- List<Consumer> consumers = new ArrayList<Consumer>();
+ FakeConsumer c1 = new FakeConsumer();
+ FakeConsumer c2 = new FakeConsumer();
+ FakeConsumer c3 = new FakeConsumer();
- consumers.add(new FakeConsumer());
- consumers.add(new FakeConsumer());
- consumers.add(new FakeConsumer());
-
DistributionPolicy dp = new RoundRobinDistributionPolicy();
+ dp.addConsumer(c1);
+ dp.addConsumer(c2);
+ dp.addConsumer(c3);
- int pos = -1;
+ Consumer c = null;
- pos = dp.select(consumers, pos);
+ c = dp.select( null, false);
- assertEquals(0, pos);
+ assertEquals(c1, c);
- pos = dp.select(consumers, pos);
+ c = dp.select(null, false);
- assertEquals(1, pos);
+ assertEquals(c2, c);
- pos = dp.select(consumers, pos);
+ c = dp.select(null, false);
- assertEquals(2, pos);
+ assertEquals(c3, c);
- pos = dp.select(consumers, pos);
+ c = dp.select( null, false);
- assertEquals(0, pos);
+ assertEquals(c1, c);
- pos = dp.select(consumers, pos);
+ c = dp.select( null, false);
- assertEquals(1, pos);
+ assertEquals(c2, c);
- pos = dp.select(consumers, pos);
+ c = dp.select( null, false);
- assertEquals(2, pos);
+ assertEquals(c3, c);
- pos = dp.select(consumers, pos);
+ c = dp.select(null, false);
- assertEquals(0, pos);
+ assertEquals(c1, c);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionImplTest.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -948,7 +948,7 @@
// Xid xid = createStrictMock(Xid.class);
// expect(rm.getTransaction(xid)).andReturn(tx);
// expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-// expect(tx.isEmpty()).andReturn(false);
+// expect(tx.hasConsumers()).andReturn(false);
// tx.prepare();
// replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
// SessionXAResponseMessage message = session.XAPrepare(xid);
@@ -963,7 +963,7 @@
// Xid xid = createStrictMock(Xid.class);
// expect(rm.getTransaction(xid)).andReturn(tx);
// expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-// expect(tx.isEmpty()).andReturn(true);
+// expect(tx.hasConsumers()).andReturn(true);
// expect(rm.removeTransaction(xid)).andReturn(true);
// replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
// SessionXAResponseMessage message = session.XAPrepare(xid);
@@ -978,7 +978,7 @@
// Xid xid = createStrictMock(Xid.class);
// expect(rm.getTransaction(xid)).andReturn(tx);
// expect(tx.getState()).andReturn(Transaction.State.ACTIVE);
-// expect(tx.isEmpty()).andReturn(true);
+// expect(tx.hasConsumers()).andReturn(true);
// expect(rm.removeTransaction(xid)).andReturn(false);
// replay(rc, sm, po, qs, rm, ss, pd, cm, server, executor, xid, tx);
// SessionXAResponseMessage message = session.XAPrepare(xid);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-26 21:32:15 UTC (rev 5034)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-29 08:55:28 UTC (rev 5035)
@@ -22,13 +22,6 @@
package org.jboss.messaging.tests.unit.core.transaction.impl;
-import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.transaction.xa.Xid;
-
import org.easymock.EasyMock;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -41,9 +34,14 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
*
* A TransactionImplTest
@@ -453,7 +451,7 @@
//
// Transaction tx = new TransactionImpl(sm, po);
//
-// assertTrue(tx.isEmpty());
+// assertTrue(tx.hasConsumers());
// assertFalse(tx.isContainsPersistent());
//
// EasyMock.verify(sm);
@@ -480,7 +478,7 @@
//
// tx.addMessage(address1, message1);
//
-// assertFalse(tx.isEmpty());
+// assertFalse(tx.hasConsumers());
// assertTrue(tx.isContainsPersistent());
//
//
More information about the jboss-cvs-commits
mailing list