[hornetq-commits] JBoss hornetq SVN: r9456 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Jul 22 09:04:29 EDT 2010
Author: timfox
Date: 2010-07-22 09:04:29 -0400 (Thu, 22 Jul 2010)
New Revision: 9456
Added:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-444
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 13:04:29 UTC (rev 9456)
@@ -56,6 +56,8 @@
/**
* Implementation of a Queue
+ *
+ * Completely non blocking between adding to queue and delivering to consumers.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -69,6 +71,8 @@
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
public static final int NUM_PRIORITIES = 10;
+
+ public static final int MAX_DELIVERIES_IN_LOOP = 1000;
private final long id;
@@ -121,6 +125,8 @@
private int pos;
private final Executor executor;
+
+ private volatile int consumerWithFilterCount;
private static class ConsumerHolder
{
@@ -228,7 +234,7 @@
{
return filter;
}
-
+
public void addLast(final MessageReference ref)
{
addLast(ref, false);
@@ -265,6 +271,11 @@
{
cancelRedistributor();
+ if (consumer.getFilter() != null)
+ {
+ consumerWithFilterCount++;
+ }
+
consumerList.add(new ConsumerHolder(consumer));
consumerSet.add(consumer);
@@ -307,6 +318,11 @@
{
groups.remove(gid);
}
+
+ if (consumer.getFilter() != null)
+ {
+ consumerWithFilterCount--;
+ }
}
public synchronized void addRedistributor(final long delay)
@@ -1088,17 +1104,19 @@
{
return;
}
-
+
int busyCount = 0;
int nullRefCount = 0;
+ int noMatchCount = 0;
+
int size = consumerList.size();
int startPos = pos;
// Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
- int loop = Math.min(messageReferences.size(), 1000);
+ int loop = Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LOOP);
for (int i = 0; i < loop; i++)
{
@@ -1120,6 +1138,11 @@
if (ref == null)
{
nullRefCount++;
+
+ if (holder.iter != null)
+ {
+ noMatchCount++;
+ }
}
else
{
@@ -1207,18 +1230,17 @@
break;
}
- nullRefCount = busyCount = 0;
+ nullRefCount = busyCount = noMatchCount = 0;
}
}
-
- if (messageReferences.size() > 0 && busyCount != size)
+
+ if (messageReferences.size() > 0 && busyCount != size && noMatchCount != size)
{
// More messages to deliver so need to prompt another runner - note we don't
// prompt another one if all consumers are busy
executor.execute(deliverRunner);
}
-
}
/*
@@ -1230,14 +1252,14 @@
{
return false;
}
-
+
if (checkExpired(ref))
{
return true;
}
-
+
int startPos = pos;
-
+
int size = consumerList.size();
while (true)
@@ -1261,7 +1283,7 @@
consumer = groupConsumer;
}
}
-
+
pos++;
if (pos == size)
@@ -1272,12 +1294,12 @@
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED)
- {
+ {
if (groupID != null && groupConsumer == null)
{
groups.put(groupID, consumer);
}
-
+
return true;
}
@@ -1344,9 +1366,11 @@
* unnecessarily queued up
* During delivery toDeliver is decremented before the message is delivered, therefore if it's delivering the last
* message, then we cannot have a situation where this delivery is not prompted and message remains stranded in the
- * queue
+ * queue.
+ * The exception to this is if we have consumers with filters - these will maintain an iterator, so we need to prompt delivery every time
+ * in this case, since there may be many non matching messages already in the queue
*/
- if (refs == 1)
+ if (consumerWithFilterCount > 0 || refs == 1)
{
deliverAsync();
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2010-07-22 13:04:29 UTC (rev 9456)
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A ConsumerFilterTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConsumerFilterTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ConsumerFilterTest.class);
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessages() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+
+ assertNull(consumer.receive(500));
+
+ message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ log.info("sending second msg");
+
+ producer.send(message);
+
+ ClientMessage received = consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessagesMany() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+ }
+
+ assertNull(consumer.receive(500));
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage received = consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+ }
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+}
More information about the hornetq-commits
mailing list