[jboss-cvs] JBoss Messaging SVN: r5073 - in trunk: src/main/org/jboss/messaging/core/server and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 6 05:36:11 EDT 2008
Author: ataylor
Date: 2008-10-06 05:36:10 -0400 (Mon, 06 Oct 2008)
New Revision: 5073
Modified:
trunk/messaging.ipr
trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-375 - changed the distribution policy API to make it simpler
Modified: trunk/messaging.ipr
===================================================================
--- trunk/messaging.ipr 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/messaging.ipr 2008-10-06 09:36:10 UTC (rev 5073)
@@ -361,6 +361,9 @@
<option name="GENERATE_IIOP_STUBS" value="false" />
<option name="ADDITIONAL_OPTIONS_STRING" value="" />
</component>
+ <component name="SvnBranchConfigurationManager">
+ <option name="myVersion" value="123" />
+ </component>
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="" />
<mapping directory="$PROJECT_DIR$" vcs="svn" />
@@ -453,13 +456,14 @@
<root url="file://$PROJECT_DIR$/tests/config" />
<root url="file://$PROJECT_DIR$/tests/jms-tests/config" />
<root url="file://$PROJECT_DIR$/src/config" />
- <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080520.004618-19.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/easymock/lib/easymock.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/easymock-classextension/lib/easymockclassextension.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/cglib/lib/cglib.jar!/" />
<root url="jar://$PROJECT_DIR$/tools/lib/jbossbuild.jar!/" />
- <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
- <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
+ <root url="jar://$ANT_HOME$/lib/ant-junit.jar!/" />
+ <root url="jar://$ANT_HOME$/lib/ant.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/netty/lib/netty-3.0.0.CR4.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
@@ -471,5 +475,8 @@
<jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />
</library>
</component>
+ <UsedPathMacros>
+ <macro name="ANT_HOME" />
+ </UsedPathMacros>
</project>
Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -31,7 +31,7 @@
*/
public interface DistributionPolicy
{
- Consumer select(ServerMessage message, boolean redeliver);
+ HandleStatus distribute(final MessageReference reference);
void addConsumer(Consumer consumer);
@@ -40,6 +40,4 @@
int getConsumerCount();
boolean hasConsumers();
-
- int getCurrentPosition();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -35,8 +35,6 @@
{
protected final List<Consumer> consumers = new ArrayList<Consumer>();
- public abstract Consumer select(ServerMessage message, boolean redeliver);
-
public void addConsumer(Consumer consumer)
{
consumers.add(consumer);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -24,6 +24,8 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.util.SimpleString;
import java.util.ArrayList;
@@ -38,6 +40,7 @@
* The Initial consumer is the first consumer found, using the round robin policy, that hasn't been bound to a group, If
* there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
* so on.
+ *
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public class GroupingRoundRobinDistributionPolicy extends RoundRobinDistributionPolicy
@@ -48,199 +51,90 @@
// Attributes ----------------------------------------------------
- // Map with GroupID as a key and a Consumer as value.
- private final Map<SimpleString, ConsumerState> consumers = new ConcurrentHashMap<SimpleString, ConsumerState>();
+ private ConcurrentHashMap<SimpleString, Consumer> cons = new ConcurrentHashMap<SimpleString, Consumer>();
- // we hold the state of each consumer, i.e., is it bound etc
- private final Map<Consumer, ConsumerState> consumerStateMap = new ConcurrentHashMap<Consumer, ConsumerState>();
// Distributor implementation ------------------------------------
- @Override
- public Consumer select(final ServerMessage message, final boolean redeliver)
+ public HandleStatus distribute(MessageReference reference)
{
- if (message.getProperty(MessageImpl.GROUP_ID) != null)
+ final SimpleString groupId = (SimpleString) reference.getMessage().getProperty(MessageImpl.GROUP_ID);
+ if (groupId != null)
{
- final SimpleString groupId = (SimpleString)message.getProperty(MessageImpl.GROUP_ID);
- final ConsumerState consumerState = consumers.get(groupId);
- if (consumerState != null)
+ boolean bound;
+ int startPos = pos;
+ boolean filterRejected = false;
+
+ while (true)
{
- // if this is a redelivery and the group is bound we wait.
- if (redeliver && consumerState.isBound())
+ Consumer consumer = cons.putIfAbsent(groupId, consumers.get(pos));
+ if (consumer == null)
{
- return null;
+ incrementPosition();
+ consumer = cons.get(groupId);
+ bound = false;
}
- // if this is a redelivery and it was its first attempt we can look for another consumer and use that
- else if (redeliver && !consumerState.isBound())
+ else
{
- removeBinding(groupId, consumerState);
- return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
+ bound = true;
}
- // we bind after we know that the first message has been successfully consumed
- else if (!consumerState.isBound())
+ HandleStatus status = handle(reference, consumer);
+ if (status == HandleStatus.HANDLED)
{
- consumerState.setBound(true);
+ return HandleStatus.HANDLED;
}
- consumerState.setAvailable(false);
-
- return consumerState.getConsumer();
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ filterRejected = true;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ //if we were previously bound, we can remove and try the next consumer
+ if (bound)
+ {
+ return HandleStatus.BUSY;
+ }
+ else
+ {
+ cons.remove(groupId);
+ }
+ }
+ //if we've tried all of them
+ if (startPos == pos)
+ {
+ // Tried all of them
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
+ }
}
- else
- {
- return getNextPositionAndBind(message, redeliver, groupId).getConsumer();
- }
}
else
{
- return super.select(message, redeliver);
+ return super.distribute(reference);
}
}
- @Override
- public synchronized void addConsumer(final Consumer consumer)
+ public synchronized boolean removeConsumer(Consumer consumer)
{
- super.addConsumer(consumer);
- consumerStateMap.put(consumer, new ConsumerState(consumer));
- }
-
- @Override
- public synchronized boolean removeConsumer(final Consumer consumer)
- {
- final boolean removed = super.removeConsumer(consumer);
+ boolean removed = super.removeConsumer(consumer);
if (removed)
{
- final ConsumerState cs = consumerStateMap.remove(consumer);
- for (final SimpleString ss : cs.getGroupIds())
+ for (SimpleString group : cons.keySet())
{
- consumers.remove(ss);
- }
-
- }
- return removed;
- }
-
- /**
- * we need to find the next available consumer that doesn't have a binding. If there are no free we use the next
- * available in the normal Round Robin fashion.
- * @param message
- * @param redeliver
- * @param groupId
- * @return
- */
- private ConsumerState getNextPositionAndBind(final ServerMessage message,
- final boolean redeliver,
- final SimpleString groupId)
- {
- Consumer consumer = super.select(message, redeliver);
- final ConsumerState cs = consumerStateMap.get(consumer);
- // if there is only one return it
- if (getConsumerCount() == 1 || cs.isAvailable())
- {
- consumers.put(groupId, cs);
- cs.getGroupIds().add(groupId);
- return cs;
- }
- else
- {
- consumer = super.select(message, redeliver);
- ConsumerState ncs = consumerStateMap.get(consumer);
- while (!ncs.isAvailable())
- {
- consumer = super.select(message, redeliver);
- ncs = consumerStateMap.get(consumer);
- if (ncs == cs)
+ if (consumer == cons.get(group))
{
- cs.getGroupIds().add(groupId);
- return cs;
+ cons.remove(group);
+ break;
}
}
- ncs.getGroupIds().add(groupId);
- return ncs;
}
+ return removed;
}
-
- private void removeBinding(final SimpleString groupId, final ConsumerState consumerState)
- {
- consumerState.setAvailable(true);
- consumerState.getGroupIds().remove(groupId);
- consumers.remove(groupId);
- }
-
- /**
- * holds the current state of a consumer, is it available, what groups it is bound to etc.
- */
- class ConsumerState
- {
- private final Consumer consumer;
-
- private volatile boolean isBound = false;
-
- private volatile boolean available = true;
-
- private final List<SimpleString> groupIds = new ArrayList<SimpleString>();
-
- public ConsumerState(final Consumer consumer)
- {
- this.consumer = consumer;
- }
-
- public boolean isBound()
- {
- return isBound;
- }
-
- public void setBound(final boolean bound)
- {
- isBound = bound;
- }
-
- public boolean isAvailable()
- {
- return available;
- }
-
- public void setAvailable(final boolean available)
- {
- this.available = available;
- }
-
- public Consumer getConsumer()
- {
- return consumer;
- }
-
- public List<SimpleString> getGroupIds()
- {
- return groupIds;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final ConsumerState that = (ConsumerState)o;
-
- if (!consumer.equals(that.consumer))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return consumer.hashCode();
- }
-
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -51,7 +51,7 @@
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
@@ -175,7 +175,7 @@
{
return add(ref, true);
}
-
+
public synchronized void addListFirst(final LinkedList<MessageReference> list)
{
ListIterator<MessageReference> iter = list.listIterator(list.size());
@@ -183,11 +183,11 @@
while (iter.hasPrevious())
{
MessageReference ref = iter.previous();
-
+
ServerMessage msg = ref.getMessage();
messageReferences.addFirst(ref, msg.getPriority());
-
+
checkWaiting(msg.getMessageID());
}
@@ -289,7 +289,7 @@
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
boolean removed = distributionPolicy.removeConsumer(consumer);
-
+
if (removed)
{
distributionPolicy.removeConsumer(consumer);
@@ -700,7 +700,7 @@
return true;
}
- QueueImpl qother = (QueueImpl)other;
+ QueueImpl qother = (QueueImpl) other;
return name.equals(qother.name);
}
@@ -822,94 +822,20 @@
private HandleStatus deliver(final MessageReference reference)
{
- if (distributionPolicy.getConsumerCount() == 0)
+ HandleStatus status = distributionPolicy.distribute(reference);
+ if (status == HandleStatus.HANDLED)
{
- return HandleStatus.BUSY;
- }
+ deliveringCount.incrementAndGet();
- int startPos = distributionPolicy.getCurrentPosition();
-
- boolean filterRejected = false;
-
- HandleStatus status = null;
- int pos;
- while (true)
+ return HandleStatus.HANDLED;
+ }
+ else if (status == HandleStatus.NO_MATCH)
{
- Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
- pos = distributionPolicy.getCurrentPosition();
- if (consumer == null)
- {
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
- }
- try
- {
- status = consumer.handle(reference);
- }
- catch (Throwable t)
- {
- log.warn("removing consumer which did not handle a message, " + "consumer=" +
- consumer +
- ", message=" +
- reference, t);
-
- // If the consumer throws an exception we remove the consumer
- try
- {
- removeConsumer(consumer);
- }
- catch (Exception e)
- {
- log.error("Failed to remove consumer", e);
- }
-
- return HandleStatus.BUSY;
- }
-
- if (status == null)
- {
- throw new IllegalStateException("ClientConsumer.handle() should never return null");
- }
-
- if (status == HandleStatus.HANDLED)
- {
- deliveringCount.incrementAndGet();
-
- return HandleStatus.HANDLED;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- promptDelivery = true;
-
- filterRejected = true;
- }
- if (startPos > distributionPolicy.getConsumerCount() - 1)
- {
- startPos = distributionPolicy.getConsumerCount() - 1;
- }
- if (startPos == pos)
- {
- // Tried all of them
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
- }
+ promptDelivery = true;
}
+ return status;
}
-
+
private void checkWaiting(final long messageID)
{
CountDownLatch latch = waitingIDMap.remove(messageID);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -18,38 +18,28 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.logging.Logger;
/**
- *
* A RoundRobinDistributionPolicy
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
{
- int pos = 0;
+ Logger log = Logger.getLogger(RoundRobinDistributionPolicy.class);
- public Consumer select(ServerMessage message, boolean redeliver)
- {
- if(consumers.isEmpty())
- {
- return null;
- }
- int startPos = pos++;
+ protected int pos = 0;
- if (pos == consumers.size())
- {
- pos = 0;
- }
- return consumers.get(startPos);
- }
public synchronized void addConsumer(Consumer consumer)
{
@@ -64,8 +54,90 @@
return super.removeConsumer(consumer);
}
- public int getCurrentPosition()
+ public HandleStatus distribute(final MessageReference reference)
{
- return pos;
+ if (getConsumerCount() == 0)
+ {
+ return HandleStatus.BUSY;
+ }
+ int startPos = pos;
+ boolean filterRejected = false;
+ HandleStatus status;
+ while (true)
+ {
+ status = handle(reference, getNextConsumer());
+
+ if (status == HandleStatus.HANDLED)
+ {
+ return HandleStatus.HANDLED;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ filterRejected = true;
+ }
+ if (startPos == pos)
+ {
+ // Tried all of them
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
+ }
+ }
}
+
+ protected Consumer getNextConsumer()
+ {
+ Consumer consumer = consumers.get(pos);
+ incrementPosition();
+ return consumer;
+ }
+
+ protected void incrementPosition()
+ {
+ pos++;
+ if (pos == consumers.size())
+ {
+ pos = 0;
+ }
+ }
+
+ protected HandleStatus handle(MessageReference reference, Consumer consumer)
+ {
+ HandleStatus status;
+ try
+ {
+ status = consumer.handle(reference);
+ }
+ catch (Throwable t)
+ {
+ log.warn("removing consumer which did not handle a message, " + "consumer=" +
+ consumer +
+ ", message=" +
+ reference, t);
+
+ // If the consumer throws an exception we remove the consumer
+ try
+ {
+ removeConsumer(consumer);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to remove consumer", e);
+ }
+
+ return HandleStatus.BUSY;
+ }
+
+ if (status == null)
+ {
+ throw new IllegalStateException("ClientConsumer.handle() should never return null");
+ }
+ return status;
+ }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -25,7 +25,11 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.DistributionPolicy;
import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -46,20 +50,71 @@
policy = null;
}
- public void testSingleConsumerSingleGroup()
+ public void testSingleConsumerSingleGroup() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
policy.addConsumer(consumer);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
- EasyMock.replay(consumer, serverMessage);
- assertEquals(consumer, policy.select( serverMessage, false));
- assertEquals(consumer, policy.select(serverMessage, false));
- EasyMock.verify(consumer, serverMessage);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, serverMessage, reference);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ EasyMock.verify(consumer, serverMessage, reference);
}
- public void testMultipleConsumersSingleGroup()
+ public void testRunOutOfConsumers() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ DistributionPolicy dp = new RoundRobinDistributionPolicy();
+ dp.addConsumer(c1);
+ dp.addConsumer(c2);
+ dp.addConsumer(c3);
+ EasyMock.expect(c1.handle(reference)).andReturn(HandleStatus.BUSY);
+ EasyMock.expect(c2.handle(reference)).andReturn(HandleStatus.BUSY);
+ EasyMock.expect(c3.handle(reference)).andReturn(HandleStatus.BUSY);
+ EasyMock.replay(reference, c1, c2, c3, serverMessage);
+
+ HandleStatus status = dp.distribute(reference);
+ assertEquals(status, HandleStatus.BUSY);
+ EasyMock.verify(reference, c1, c2, c3, serverMessage);
+ }
+
+ public void testRunOutOfConsumersNoMatch() throws Exception
+ {
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+ ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ DistributionPolicy dp = new RoundRobinDistributionPolicy();
+ dp.addConsumer(c1);
+ dp.addConsumer(c2);
+ dp.addConsumer(c3);
+ EasyMock.expect(c1.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.expect(c2.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.expect(c3.handle(reference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.replay(reference, c1, c2, c3, serverMessage);
+
+ HandleStatus status = dp.distribute(reference);
+ assertEquals(status, HandleStatus.NO_MATCH);
+ EasyMock.verify(reference, c1, c2, c3, serverMessage);
+ }
+
+ public void testMultipleConsumersSingleGroup() throws Exception
+ {
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -67,29 +122,40 @@
policy.addConsumer(consumer2);
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
- EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer, policy.select(serverMessage, false));
- EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
}
- public void testSingleConsumerTwoGroups()
+ public void testSingleConsumerTwoGroups() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
policy.addConsumer(consumer);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
- EasyMock.replay(consumer, serverMessage, serverMessage2);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer, policy.select(serverMessage2, false));
- EasyMock.verify(consumer, serverMessage2);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, serverMessage, serverMessage2, reference, reference2);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ EasyMock.verify(consumer, serverMessage2, reference, reference2);
}
- public void testMultipleConsumersTwoGroups()
+ public void testMultipleConsumersTwoGroups() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -97,19 +163,26 @@
policy.addConsumer(consumer2);
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
- EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer2, policy.select(serverMessage2, false));
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer2, policy.select(serverMessage2, false));
- EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, reference, reference2);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, reference, reference2);
}
- public void testMultipleConsumersSingleGroupFirstDeliveryFailed()
+ public void testMultipleConsumersSingleGroupFirstDeliveryFailed() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -117,15 +190,18 @@
policy.addConsumer(consumer2);
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
- EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer2, policy.select(serverMessage, true));
- EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
+ EasyMock.expect(consumer2.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
}
- public void testMultipleConsumersSingleGroupSecondDeliveryFailed()
+ public void testMultipleConsumersSingleGroupSecondDeliveryFailed() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -133,16 +209,27 @@
policy.addConsumer(consumer2);
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
- EasyMock.replay(consumer, consumer2, consumer3, serverMessage);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(null, policy.select(serverMessage, true));
- EasyMock.verify(consumer, consumer2, consumer3, serverMessage);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
+ EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.BUSY, policy.distribute(reference));
+ EasyMock.verify(consumer, consumer2, consumer3, serverMessage, reference);
}
- public void testMultipleConsumersMultipleGroupMultipleGroupsEach()
+ public void testMultipleConsumersMultipleGroupMultipleGroupsEach() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference3 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference4 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference5 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference6 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference7 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference8 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference9 = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -150,41 +237,64 @@
policy.addConsumer(consumer2);
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
ServerMessage serverMessage4 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference4.getMessage()).andStubReturn(serverMessage4);
EasyMock.expect(serverMessage4.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid4"));
ServerMessage serverMessage5 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference5.getMessage()).andStubReturn(serverMessage5);
EasyMock.expect(serverMessage5.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid5"));
ServerMessage serverMessage6 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference6.getMessage()).andStubReturn(serverMessage6);
EasyMock.expect(serverMessage6.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid6"));
ServerMessage serverMessage7 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference7.getMessage()).andStubReturn(serverMessage7);
EasyMock.expect(serverMessage7.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid7"));
ServerMessage serverMessage8 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference8.getMessage()).andStubReturn(serverMessage8);
EasyMock.expect(serverMessage8.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid8"));
ServerMessage serverMessage9 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference9.getMessage()).andStubReturn(serverMessage9);
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer3.handle(reference3)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference4)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference5)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer3.handle(reference6)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference7)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference8)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer3.handle(reference9)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(serverMessage9.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid9"));
EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
- serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer2, policy.select(serverMessage2, false));
- assertEquals(consumer3, policy.select(serverMessage3, false));
- assertEquals(consumer, policy.select(serverMessage4, false));
- assertEquals(consumer2, policy.select(serverMessage5, false));
- assertEquals(consumer3, policy.select(serverMessage6, false));
- assertEquals(consumer, policy.select(serverMessage7, false));
- assertEquals(consumer2, policy.select(serverMessage8, false));
- assertEquals(consumer3, policy.select(serverMessage9, false));
+ serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9, reference,
+ reference2, reference3, reference4, reference5, reference6, reference7, reference8, reference9);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference3));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference4));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference5));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference6));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference7));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference8));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference9));
EasyMock.verify(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
- serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9);
+ serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9, reference,
+ reference2, reference3, reference4, reference5, reference6, reference7, reference8, reference9);
}
- public void testMultipleConsumersConsumerRemoved()
+ public void testMultipleConsumersConsumerRemoved() throws Exception
{
+ MessageReference reference = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference2 = EasyMock.createStrictMock(MessageReference.class);
+ MessageReference reference3 = EasyMock.createStrictMock(MessageReference.class);
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
Consumer consumer2 = EasyMock.createStrictMock(Consumer.class);
Consumer consumer3 = EasyMock.createStrictMock(Consumer.class);
@@ -194,18 +304,27 @@
policy.addConsumer(consumer3);
policy.addConsumer(consumer4);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
+ EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
- EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
- assertEquals(consumer, policy.select(serverMessage, false));
- assertEquals(consumer2, policy.select(serverMessage2, false));
- assertEquals(consumer3, policy.select(serverMessage3, false));
+ EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer3.handle(reference3)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(consumer.handle(reference2)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3,
+ reference, reference2, reference3);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference3));
policy.removeConsumer(consumer2);
- assertEquals(consumer, policy.select(serverMessage2, false));
- EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3);
+ assertEquals(HandleStatus.HANDLED, policy.distribute(reference2));
+ EasyMock.verify(consumer, consumer2, consumer3, consumer4, serverMessage, serverMessage2, serverMessage3,
+ reference, reference2, reference3);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -1444,6 +1444,18 @@
return null;
}
+ public HandleStatus distribute(MessageReference reference)
+ {
+ try
+ {
+ return consumer.handle(reference);
+ }
+ catch (Exception e)
+ {
+ return HandleStatus.BUSY;
+ }
+ }
+
public void addConsumer(Consumer consumer)
{
this.consumer = consumer;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java 2008-10-04 16:22:15 UTC (rev 5072)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/RoundRobinDistributionPolicyTest.java 2008-10-06 09:36:10 UTC (rev 5073)
@@ -24,9 +24,12 @@
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.jboss.messaging.tests.util.UnitTestCase;
+import org.easymock.EasyMock;
import java.util.ArrayList;
import java.util.List;
@@ -43,55 +46,86 @@
public void testNoConsumers()
{
- List<Consumer> consumers = new ArrayList<Consumer>();
+ MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
DistributionPolicy dp = new RoundRobinDistributionPolicy();
-
- Consumer c = dp.select(null, false);
-
- assertEquals(null, c);
+
+ EasyMock.replay(messageReference);
+ HandleStatus status = dp.distribute(messageReference);
+ EasyMock.verify(messageReference);
+ assertEquals(status, HandleStatus.BUSY);
}
- public void testConsumers()
+ public void testConsumers() throws Exception
{
- FakeConsumer c1 = new FakeConsumer();
- FakeConsumer c2 = new FakeConsumer();
- FakeConsumer c3 = new FakeConsumer();
+ MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+ Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c3 = EasyMock.createStrictMock(Consumer.class);
DistributionPolicy dp = new RoundRobinDistributionPolicy();
dp.addConsumer(c1);
dp.addConsumer(c2);
dp.addConsumer(c3);
-
- Consumer c = null;
+ EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.HANDLED);
+ EasyMock.replay(messageReference, c1, c2, c3);
- c = dp.select( null, false);
-
- assertEquals(c1, c);
-
- c = dp.select(null, false);
-
- assertEquals(c2, c);
-
- c = dp.select(null, false);
-
- assertEquals(c3, c);
-
- c = dp.select( null, false);
-
- assertEquals(c1, c);
-
- c = dp.select( null, false);
-
- assertEquals(c2, c);
-
- c = dp.select( null, false);
-
- assertEquals(c3, c);
-
- c = dp.select(null, false);
-
- assertEquals(c1, c);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ dp.distribute(messageReference);
+ EasyMock.verify(messageReference, c1, c2, c3);
}
+
+ public void testRunOutOfConsumers() throws Exception
+ {
+ MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+ Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+
+ DistributionPolicy dp = new RoundRobinDistributionPolicy();
+ dp.addConsumer(c1);
+ dp.addConsumer(c2);
+ dp.addConsumer(c3);
+ EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.BUSY);
+ EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.BUSY);
+ EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.BUSY);
+ EasyMock.replay(messageReference, c1, c2, c3);
+
+ HandleStatus status = dp.distribute(messageReference);
+ assertEquals(status, HandleStatus.BUSY);
+ EasyMock.verify(messageReference, c1, c2, c3);
+ }
+ public void testRunOutOfConsumersNoMatch() throws Exception
+ {
+ MessageReference messageReference = EasyMock.createStrictMock(MessageReference.class);
+ Consumer c1 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c2 = EasyMock.createStrictMock(Consumer.class);
+ Consumer c3 = EasyMock.createStrictMock(Consumer.class);
+
+ DistributionPolicy dp = new RoundRobinDistributionPolicy();
+ dp.addConsumer(c1);
+ dp.addConsumer(c2);
+ dp.addConsumer(c3);
+ EasyMock.expect(c1.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.expect(c2.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.expect(c3.handle(messageReference)).andReturn(HandleStatus.NO_MATCH);
+ EasyMock.replay(messageReference, c1, c2, c3);
+
+ HandleStatus status = dp.distribute(messageReference);
+ assertEquals(status, HandleStatus.NO_MATCH);
+ EasyMock.verify(messageReference, c1, c2, c3);
+ }
+
}
More information about the jboss-cvs-commits
mailing list