[jboss-cvs] JBoss Messaging SVN: r7531 - in trunk: examples/jms/message-group and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 7 06:42:54 EDT 2009
Author: jmesnil
Date: 2009-07-07 06:42:54 -0400 (Tue, 07 Jul 2009)
New Revision: 7531
Removed:
trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java
Modified:
trunk/docs/user-manual/en/message-grouping.xml
trunk/docs/user-manual/en/queue-attributes.xml
trunk/examples/jms/message-group/readme.html
trunk/examples/jms/message-group/server0/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java
trunk/src/main/org/jboss/messaging/core/server/Distributor.java
trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
JBMESSAGING-1505: Optimise multiple message selectors on a queue
JBMESSAGING-375: Grouping of messages
* in QueueImpl, consumers with filters have their own iterators to iterate on the
references and "skip" messages that do not match their filters. Consumers without filters
now peeks on the references directly
* removed the grouping round robin distributor, choosing the appropriate consumer
is now done while delivering the messages. The QueueImpl keeps a map of the group ID <-> consumer
* updated examples + doc (there is no need to set the distribution policy class
to use message groups now)
Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/docs/user-manual/en/message-grouping.xml 2009-07-07 10:42:54 UTC (rev 7531)
@@ -15,22 +15,6 @@
messages with the same group id.</para>
</listitem>
</itemizedlist>
- <section id="message-grouping.configuring">
- <title>Configuring Message Grouping</title>
- <para>Message grouping must be enabled in the address-setting configuration by using a
- specific <literal>distribution-policy-class</literal>:</para>
- <programlisting>
- <address-setting match="jms.queue.exampleQueue">
- <distribution-policy-class>
- org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor
- </distribution-policy-class>
- </address-setting>
- </programlisting>
- <para>By default, <literal>distribution-policy-class</literal> is set to <literal
- >org.jboss.messaging.core.server.impl.RoundRobinDistributor</literal> and message groups
- will not be handled by the queue. Address wildcards can be used to configure the
- distribution policy for a set of addresses (see <xref linkend="wildcard-syntax"/>).</para>
- </section>
<section>
<title>Using Core API</title>
<para>The property name used to identify the message group is <literal
Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/docs/user-manual/en/queue-attributes.xml 2009-07-07 10:42:54 UTC (rev 7531)
@@ -79,7 +79,7 @@
<redelivery-delay>5000</redelivery-delay>
<expiry-address>jms.queue.expiryQueue</expiry-address>
<last-value-queue>true</last-value-queue>
- <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
+ <distribution-policy-class>org.jboss.messaging.core.server.impl.RoundRobinDistributor</distribution-policy-class>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>20000</page-size-bytes>
<redistribution-delay>0</redistribution-delay>
Modified: trunk/examples/jms/message-group/readme.html
===================================================================
--- trunk/examples/jms/message-group/readme.html 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/examples/jms/message-group/readme.html 2009-07-07 10:42:54 UTC (rev 7531)
@@ -13,19 +13,7 @@
<li>Messages in a message group will be all delivered to no more than one of the queue's consumers. The consumer that receives the
first message of a group will receive all the messages that belongs to the group.</li>
- <p>To enable message group, you need to configure the 'distribution-policy-class' for the queue
- to be "org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor" in jbm-queues.xml. This configure parameter
- is part of the 'address-settings' configuration element, as shown in the following </p>
-
- <pre>
- <code>
- <address-setting match="jms.queue.exampleQueue">
- <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
- </address-setting>
- </code>
- </pre>
-
- <p>After the configuration, you can make any message belong to a message group by setting its 'JMXGroupID' string property to the group id.
+ <p>You can make any message belong to a message group by setting its 'JMXGroupID' string property to the group id.
In this example we create a message group 'Group-0'. And make such a message group of 10 messages. It also create two consumers on the queue
where the 10 'Group-0' group messages are to be sent. You can see that with message grouping enabled, all the 10 messages will be received by
the first consumer. The second consumer will receive none. </p>
Modified: trunk/examples/jms/message-group/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/jms/message-group/server0/jbm-configuration.xml 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/examples/jms/message-group/server0/jbm-configuration.xml 2009-07-07 10:42:54 UTC (rev 7531)
@@ -30,10 +30,4 @@
</security-setting>
</security-settings>
- <address-settings>
- <address-setting match="jms.queue.exampleQueue">
- <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
- </address-setting>
- </address-settings>
-
</configuration>
Modified: trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/list/PriorityLinkedList.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -53,4 +53,5 @@
Iterator<T> iterator();
boolean isEmpty();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.server;
-import java.util.List;
/**
*
@@ -33,15 +32,13 @@
*/
public interface Distributor
{
- HandleStatus distribute(final MessageReference reference);
-
void addConsumer(Consumer consumer);
boolean removeConsumer(Consumer consumer);
int getConsumerCount();
-
- boolean hasConsumers();
- List<Consumer> getConsumers();
+ Consumer peekConsumer();
+
+ void incrementPosition();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -49,11 +49,6 @@
return consumers.size();
}
- public boolean hasConsumers()
- {
- return !consumers.isEmpty();
- }
-
public List<Consumer> getConsumers()
{
return consumers;
Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributor.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -1,139 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * Distributes message based on the message property 'JBM_GROUP_ID'. Once a message has been successfully delivered to a
- * consumer that consumer is then bound to that group. Any message that has the same group id set will always be
- * delivered to the same consumer.
- * The Initial consumer is the first consumer found, using the round robin policy, that hasn't been bound to a group, If
- * there are no consumers left that have not been bound to a group then the next consumer will be bound to 2 groups and
- * so on.
- *
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- */
-public class GroupingRoundRobinDistributor extends RoundRobinDistributor
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private ConcurrentMap<SimpleString, Consumer> cons = new ConcurrentHashMap<SimpleString, Consumer>();
-
- // Distributor implementation ------------------------------------
-
- public HandleStatus distribute(final MessageReference reference)
- {
- if (consumers.isEmpty())
- {
- return HandleStatus.BUSY;
- }
-
- final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
-
- if (groupId != null)
- {
- int startPos = pos;
-
- boolean filterRejected = false;
-
- while (true)
- {
- Consumer consumer = consumers.get(pos);
-
- Consumer oldConsumer = cons.putIfAbsent(groupId, consumer);
-
- if (oldConsumer == null)
- {
- incrementPosition();
- }
- else
- {
- consumer = oldConsumer;
- }
-
- HandleStatus status = handle(reference, consumer);
-
- if (status == HandleStatus.HANDLED)
- {
- return HandleStatus.HANDLED;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- filterRejected = true;
- }
- else if (status == HandleStatus.BUSY)
- {
- // if we were previously bound, we can remove and try the next consumer
- return HandleStatus.BUSY;
- }
- // if we've tried all of them
- if (startPos == pos)
- {
- // Tried all of them
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
- }
- }
- }
- else
- {
- return super.distribute(reference);
- }
- }
-
- public synchronized boolean removeConsumer(Consumer consumer)
- {
- boolean removed = super.removeConsumer(consumer);
-
- if (removed)
- {
- for (SimpleString group : cons.keySet())
- {
- if (consumer == cons.get(group))
- {
- cons.remove(group);
- }
- }
- }
- return removed;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -25,6 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -63,7 +65,7 @@
import org.jboss.messaging.utils.SimpleString;
/**
- * Implementation of a Queue TODO use Java 5 concurrent queue
+ * Implementation of a Queue
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -139,6 +141,8 @@
// We cache the consumers here since we don't want to include the redistributor
private final Set<Consumer> consumers = new HashSet<Consumer>();
+ private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+ private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
public QueueImpl(final long persistenceID,
final SimpleString address,
@@ -420,21 +424,36 @@
cancelRedistributor();
distributionPolicy.addConsumer(consumer);
-
consumers.add(consumer);
+ if (consumer.getFilter() != null)
+ {
+ iterators.put(consumer, messageReferences.iterator());
+ }
}
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
boolean removed = distributionPolicy.removeConsumer(consumer);
- if (!distributionPolicy.hasConsumers())
+ if (distributionPolicy.getConsumerCount() == 0)
{
promptDelivery = false;
}
consumers.remove(consumer);
+ iterators.remove(consumer);
+ if (removed)
+ {
+ for (SimpleString groupID : groups.keySet())
+ {
+ if (consumer == groups.get(groupID))
+ {
+ groups.remove(groupID);
+ }
+ }
+ }
+
return removed;
}
@@ -1265,12 +1284,28 @@
direct = false;
+ if (distributionPolicy.getConsumerCount() == 0)
+ {
+ return;
+ }
+
+ Consumer firstConsumer = distributionPolicy.peekConsumer();
+
+ Consumer consumer;
+
MessageReference reference;
Iterator<MessageReference> iterator = null;
+ int totalConsumers = distributionPolicy.getConsumerCount();
+ Set<Consumer> busyConsumers = new HashSet<Consumer>();
+
while (true)
{
+ consumer = distributionPolicy.peekConsumer();
+
+ iterator = iterators.get(consumer);
+
if (iterator == null)
{
reference = messageReferences.peekFirst();
@@ -1284,52 +1319,45 @@
else
{
reference = null;
+ if (consumer.getFilter() != null)
+ {
+ // we have iterated on the whole queue for
+ // messages which matches the consumer filter.
+ // we reset its iterator in case new messages are added to the queue
+ iterators.put(consumer, messageReferences.iterator());
+ }
}
}
-
+
if (reference == null)
{
- if (iterator == null)
+ if (consumer == firstConsumer)
{
- if (pagingStore != null)
- {
- // If the queue is empty, we need to check if there are pending messages, and throw a warning
- if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
- {
- // This is just a *request* to depage. Depage will only happens if there is space on the Address
- // and GlobalSize
- pagingStore.startDepaging();
-
- log.warn("The Queue " + name +
- " is empty, however there are pending messages on Paging for the address " +
- pagingStore.getStoreName() +
- " waiting message ACK before they could be routed");
- }
- }
+ startDepaging();
// We delivered all the messages - go into direct delivery
direct = true;
-
promptDelivery = false;
}
return;
}
- // PagingManager would be null only on testcases
- if (pagingStore == null && pagingManager != null)
+ initPagingStore(reference.getMessage().getDestination());
+
+ final SimpleString groupID = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
+
+ if (groupID != null)
{
- // TODO: It would be better if we could initialize the pagingStore during the construction
- try
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+ if (groupConsumer != null && groupConsumer != consumer)
{
- pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+ // this consumer is not in charge of the message group
+ distributionPolicy.incrementPosition();
+ continue;
}
- catch (Exception e)
- {
- // This shouldn't happen, and if it happens, this shouldn't abort the route
- }
}
+
+ HandleStatus status = handle(reference, consumer);
- HandleStatus status = deliver(reference);
-
if (status == HandleStatus.HANDLED)
{
if (iterator == null)
@@ -1343,15 +1371,21 @@
}
else if (status == HandleStatus.BUSY)
{
- // All consumers busy - give up
- break;
+ busyConsumers.add(consumer);
+ if (groupID != null || busyConsumers.size() == totalConsumers)
+ {
+ // when all consumers are busy, we stop
+ break;
+ }
}
- else if (status == HandleStatus.NO_MATCH && iterator == null)
+ else if (status == HandleStatus.NO_MATCH)
{
- // Consumers not all busy - but filter not accepting - iterate
- // back
- // through the queue
- iterator = messageReferences.iterator();
+ // if consumer filter reject the message make sure it won't be assigned the message group
+ if (groupID != null)
+ {
+ groups.remove(consumer);
+ }
+ continue;
}
}
}
@@ -1374,7 +1408,7 @@
{
// Deliver directly
- HandleStatus status = deliver(ref);
+ HandleStatus status = directDeliver(ref);
if (status == HandleStatus.HANDLED)
{
@@ -1428,18 +1462,121 @@
}
}
- private HandleStatus deliver(final MessageReference reference)
+ private synchronized HandleStatus directDeliver(final MessageReference reference)
{
- HandleStatus status = distributionPolicy.distribute(reference);
+ if (distributionPolicy.getConsumerCount() == 0)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ Consumer firstConsumer = distributionPolicy.peekConsumer();
+
+ HandleStatus status;
+ boolean filterRejected = false;
+
+ while (true)
+ {
+ Consumer consumer = distributionPolicy.peekConsumer();
+
+ final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
+
+ if (groupId != null)
+ {
+ Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ // this consumer is not in charge in the message group
+ distributionPolicy.incrementPosition();
+ continue;
+ }
+ }
+
+ status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ break;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ filterRejected = true;
+ if (groupId != null)
+ {
+ groups.remove(consumer);
+ }
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ if (groupId != null)
+ {
+ break;
+ }
+ }
+ // if we've tried all of them
+ if (distributionPolicy.peekConsumer() == firstConsumer)
+ {
+ if (filterRejected)
+ {
+ status = HandleStatus.NO_MATCH;
+ break;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ status = HandleStatus.BUSY;
+ break;
+ }
+ }
+ }
+
if (status == HandleStatus.NO_MATCH)
{
promptDelivery = true;
}
-
+
return status;
}
+
+ private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
+ {
+ HandleStatus status;
+ try
+ {
+ status = consumer.handle(reference);
+ }
+ catch (Throwable t)
+ {
+ log.warn("removing consumer which did not handle a message, consumer=" +
+ consumer +
+ ", message=" +
+ reference, t);
+
+ // If the consumer throws an exception we remove the consumer
+ try
+ {
+ removeConsumer(consumer);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to remove consumer", e);
+ }
+ return HandleStatus.BUSY;
+ }
+ finally
+ {
+ distributionPolicy.incrementPosition();
+ }
+
+ if (status == null)
+ {
+ throw new IllegalStateException("ClientConsumer.handle() should never return null");
+ }
+
+ return status;
+ }
+
private void removeExpiringReference(final MessageReference ref) throws Exception
{
if (ref.getMessage().getExpiration() > 0)
@@ -1515,6 +1652,42 @@
}
}
+ private synchronized void initPagingStore(SimpleString destination)
+ {
+ // PagingManager would be null only on testcases
+ if (pagingStore == null && pagingManager != null)
+ {
+ // TODO: It would be better if we could initialize the pagingStore during the construction
+ try
+ {
+ pagingStore = pagingManager.getPageStore(destination);
+ }
+ catch (Exception e)
+ {
+ // This shouldn't happen, and if it happens, this shouldn't abort the route
+ }
+ }
+ }
+
+ private synchronized void startDepaging()
+ {
+ if (pagingStore != null)
+ {
+ // If the queue is empty, we need to check if there are pending messages, and throw a warning
+ if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
+ {
+ // This is just a *request* to depage. Depage will only happens if there is space on the Address
+ // and GlobalSize
+ pagingStore.startDepaging();
+
+ log.warn("The Queue " + name +
+ " is empty, however there are pending messages on Paging for the address " +
+ pagingStore.getStoreName() +
+ " waiting message ACK before they could be routed");
+ }
+ }
+ }
+
// Inner classes
// --------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributor.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -24,14 +24,13 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
/**
* A RoundRobinDistributor
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*/
public class RoundRobinDistributor extends DistributorImpl
{
@@ -58,57 +57,13 @@
return super.getConsumerCount();
}
- public HandleStatus distribute(final MessageReference reference)
+ public Consumer peekConsumer()
{
- if (getConsumerCount() == 0)
- {
- return HandleStatus.BUSY;
- }
- int startPos = pos;
-
- boolean filterRejected = false;
-
- HandleStatus status;
-
- while (true)
- {
- status = handle(reference, getNextConsumer());
-
- if (status == HandleStatus.HANDLED)
- {
- return HandleStatus.HANDLED;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- filterRejected = true;
- }
- if (startPos == pos)
- {
- // Tried all of them
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
- }
- }
+ return consumers.get(pos);
}
-
- private final synchronized Consumer getNextConsumer()
+
+ public synchronized void incrementPosition()
{
- Consumer consumer = consumers.get(pos);
-
- incrementPosition();
-
- return consumer;
- }
-
- protected void incrementPosition()
- {
pos++;
if (pos == consumers.size())
@@ -116,38 +71,4 @@
pos = 0;
}
}
-
- protected HandleStatus handle(final MessageReference reference, final Consumer consumer)
- {
- HandleStatus status;
- try
- {
- status = consumer.handle(reference);
- }
- catch (Throwable t)
- {
- log.warn("removing consumer which did not handle a message, " + "consumer=" +
- consumer +
- ", message=" +
- reference, t);
-
- // If the consumer throws an exception we remove the consumer
- try
- {
- removeConsumer(consumer);
- }
- catch (Exception e)
- {
- log.error("Failed to remove consumer", e);
- }
-
- return HandleStatus.BUSY;
- }
-
- if (status == null)
- {
- throw new IllegalStateException("ClientConsumer.handle() should never return null");
- }
- return status;
- }
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -331,6 +331,10 @@
{
try
{
+ // sleep a little bit to ensure that
+ // prod.send will be called before cons.reveive
+ Thread.sleep(500);
+
synchronized (session)
{
Message m = cons.receive(5000);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/AutogroupIdTest.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -29,13 +31,9 @@
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
-import java.util.concurrent.CountDownLatch;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -62,9 +60,6 @@
MessagingServer server = createServer(false);
try
{
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
server.start();
ClientSessionFactory sf = createInVMFactory();
@@ -118,9 +113,6 @@
MessagingServer server = createServer(false);
try
{
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
server.start();
ClientSessionFactory sf = createInVMFactory();
@@ -183,10 +175,6 @@
MessagingServer server = createServer(false);
try
{
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-
- server.getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
server.start();
ClientSessionFactory sf = createInVMFactory();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -21,6 +21,13 @@
*/
package org.jboss.messaging.tests.integration.client;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -34,18 +41,11 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.impl.XidImpl;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.utils.SimpleString;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -77,8 +77,8 @@
DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
consumer2.setMessageHandler(dummyMessageHandler2);
assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertTrue(dummyMessageHandler.list.size() == 100);
- assertTrue(dummyMessageHandler2.list.size() == 0);
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
consumer.close();
consumer2.close();
}
@@ -111,14 +111,14 @@
DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
consumer2.setMessageHandler(dummyMessageHandler2);
assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertEquals(dummyMessageHandler.list.size(), 50);
+ assertEquals(50, dummyMessageHandler.list.size());
int i = 0;
for (ClientMessage message : dummyMessageHandler.list)
{
assertEquals(message.getBody().readString(), "m" + i);
i += 2;
}
- assertEquals(dummyMessageHandler2.list.size(), 50);
+ assertEquals(50, dummyMessageHandler2.list.size());
i = 1;
for (ClientMessage message : dummyMessageHandler2.list)
{
@@ -158,14 +158,14 @@
DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
consumer2.setMessageHandler(dummyMessageHandler2);
assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertEquals(dummyMessageHandler.list.size(), 50);
+ assertEquals(50, dummyMessageHandler.list.size());
int i = 0;
for (ClientMessage message : dummyMessageHandler.list)
{
assertEquals(message.getBody().readString(), "m" + i);
i += 2;
}
- assertEquals(dummyMessageHandler2.list.size(), 50);
+ assertEquals(50, dummyMessageHandler2.list.size());
i = 1;
for (ClientMessage message : dummyMessageHandler2.list)
{
@@ -558,9 +558,6 @@
// start the server
server.start();
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- server.getAddressSettingsRepository().addMatch(qName.toString(), qs);
// then we create a client as normal
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
clientSession = sessionFactory.createSession(false, true, true);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -31,6 +31,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.Distributor;
import org.jboss.messaging.core.server.HandleStatus;
@@ -916,6 +917,137 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
}
+ public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
+
+ consumer.setStatusImmediate(HandleStatus.BUSY);
+
+ queue.addConsumer(consumer);
+
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+ ref.getMessage().putStringProperty("color", "red");
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ queue.deliverNow();
+
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertTrue(consumer.getReferences().isEmpty());
+
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+ ref.getMessage().putStringProperty("color", "green");
+ queue.addLast(ref);
+ }
+
+ assertEquals(20, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertTrue(consumer.getReferences().isEmpty());
+
+ consumer.setStatusImmediate(null);
+
+ for (int i = numMessages * 2; i < numMessages * 3; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+
+ queue.deliverNow();
+
+ assertEquals(numMessages, consumer.getReferences().size());
+ assertEquals(30, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(10, queue.getDeliveringCount());
+ }
+
+ public void testConsumerWithFilterThenAddMoreMessages() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ final int numMessages = 10;
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+ ref.getMessage().putStringProperty("color", "red");
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ queue.deliverNow();
+
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+ ref.getMessage().putStringProperty("color", "green");
+ queue.addLast(ref);
+ }
+
+ FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
+
+ queue.addConsumer(consumer);
+
+ queue.deliverNow();
+
+ assertEquals(20, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(10, queue.getDeliveringCount());
+
+
+ for (int i = numMessages * 2; i < numMessages * 3; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+ ref.getMessage().putStringProperty("color", "green");
+ queue.addLast(ref);
+ }
+
+ queue.deliverNow();
+
+ assertEquals(20, consumer.getReferences().size());
+ assertEquals(30, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(20, queue.getDeliveringCount());
+ }
+
+
// Private ------------------------------------------------------------------------------
private void testConsumerWithFilters(boolean direct) throws Exception
@@ -1100,29 +1232,13 @@
class DummyDistributionPolicy implements Distributor
{
+ Consumer consumer;
+
public List<Consumer> getConsumers()
{
return null;
}
- Consumer consumer;
- public Consumer select(ServerMessage message, boolean redeliver)
- {
- return null;
- }
-
- public HandleStatus distribute(MessageReference reference)
- {
- try
- {
- return consumer.handle(reference);
- }
- catch (Exception e)
- {
- return HandleStatus.BUSY;
- }
- }
-
public void addConsumer(Consumer consumer)
{
this.consumer = consumer;
@@ -1133,20 +1249,19 @@
return false;
}
+ public void incrementPosition()
+ {
+ }
+
public int getConsumerCount()
{
return 0;
}
- public boolean hasConsumers()
+ public Consumer peekConsumer()
{
- return false;
+ return consumer;
}
-
- public int getCurrentPosition()
- {
- return 0;
- }
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-07-07 05:08:58 UTC (rev 7530)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-07-07 10:42:54 UTC (rev 7531)
@@ -111,6 +111,11 @@
public synchronized HandleStatus handle(MessageReference reference)
{
+ if (statusToReturn == HandleStatus.BUSY)
+ {
+ return HandleStatus.BUSY;
+ }
+
if (filter != null)
{
if (filter.match(reference.getMessage()))
More information about the jboss-cvs-commits
mailing list