From do-not-reply at jboss.org Thu Jul 22 09:04:29 2010
Content-Type: multipart/mixed; boundary="===============2767350186570900383=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r9456 - in trunk:
tests/src/org/hornetq/tests/integration/client and 1 other directory.
Date: Thu, 22 Jul 2010 09:04:29 -0400
Message-ID: <201007221304.o6MD4T5m031410@svn01.web.mwc.hst.phx2.redhat.com>
--===============2767350186570900383==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: timfox
Date: 2010-07-22 09:04:29 -0400 (Thu, 22 Jul 2010)
New Revision: 9456
Added:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.=
java
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-444
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 1=
3:01:25 UTC (rev 9455)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 1=
3:04:29 UTC (rev 9456)
@@ -56,6 +56,8 @@
=
/**
* Implementation of a Queue
+ * =
+ * Completely non blocking between adding to queue and delivering to consu=
mers.
*
* @author Tim Fox
* @author Andy Taylor
@@ -69,6 +71,8 @@
public static final int REDISTRIBUTOR_BATCH_SIZE =3D 100;
=
public static final int NUM_PRIORITIES =3D 10;
+ =
+ public static final int MAX_DELIVERIES_IN_LOOP =3D 1000;
=
private final long id;
=
@@ -121,6 +125,8 @@
private int pos;
=
private final Executor executor;
+ =
+ private volatile int consumerWithFilterCount;
=
private static class ConsumerHolder
{
@@ -228,7 +234,7 @@
{
return filter;
}
- =
+
public void addLast(final MessageReference ref)
{
addLast(ref, false);
@@ -265,6 +271,11 @@
{
cancelRedistributor();
=
+ if (consumer.getFilter() !=3D null)
+ {
+ consumerWithFilterCount++;
+ }
+
consumerList.add(new ConsumerHolder(consumer));
=
consumerSet.add(consumer);
@@ -307,6 +318,11 @@
{
groups.remove(gid);
}
+
+ if (consumer.getFilter() !=3D null)
+ {
+ consumerWithFilterCount--;
+ }
}
=
public synchronized void addRedistributor(final long delay)
@@ -1088,17 +1104,19 @@
{
return;
}
-
+ =
int busyCount =3D 0;
=
int nullRefCount =3D 0;
=
+ int noMatchCount =3D 0;
+
int size =3D consumerList.size();
=
int startPos =3D pos;
=
// Deliver at most 1000 messages in one go, to prevent tying this th=
read up for too long
- int loop =3D Math.min(messageReferences.size(), 1000);
+ int loop =3D Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LO=
OP);
=
for (int i =3D 0; i < loop; i++)
{
@@ -1120,6 +1138,11 @@
if (ref =3D=3D null)
{
nullRefCount++;
+
+ if (holder.iter !=3D null)
+ {
+ noMatchCount++;
+ }
}
else
{
@@ -1207,18 +1230,17 @@
break;
}
=
- nullRefCount =3D busyCount =3D 0;
+ nullRefCount =3D busyCount =3D noMatchCount =3D 0;
}
}
-
- if (messageReferences.size() > 0 && busyCount !=3D size)
+ =
+ if (messageReferences.size() > 0 && busyCount !=3D size && noMatchCo=
unt !=3D size)
{
// More messages to deliver so need to prompt another runner - no=
te we don't
// prompt another one if all consumers are busy
=
executor.execute(deliverRunner);
}
-
}
=
/*
@@ -1230,14 +1252,14 @@
{
return false;
}
- =
+
if (checkExpired(ref))
{
return true;
}
- =
+
int startPos =3D pos;
- =
+
int size =3D consumerList.size();
=
while (true)
@@ -1261,7 +1283,7 @@
consumer =3D groupConsumer;
}
}
- =
+
pos++;
=
if (pos =3D=3D size)
@@ -1272,12 +1294,12 @@
HandleStatus status =3D handle(ref, consumer);
=
if (status =3D=3D HandleStatus.HANDLED)
- { =
+ {
if (groupID !=3D null && groupConsumer =3D=3D null)
{
groups.put(groupID, consumer);
}
- =
+
return true;
}
=
@@ -1344,9 +1366,11 @@
* unnecessarily queued up
* During delivery toDeliver is decremented before the message is de=
livered, therefore if it's delivering the last
* message, then we cannot have a situation where this delivery is n=
ot prompted and message remains stranded in the
- * queue
+ * queue.
+ * The exception to this is if we have consumers with filters - thes=
e will maintain an iterator, so we need to prompt delivery every time
+ * in this case, since there may be many non matching messages alrea=
dy in the queue
*/
- if (refs =3D=3D 1)
+ if (consumerWithFilterCount > 0 || refs =3D=3D 1)
{
deliverAsync();
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterT=
est.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest=
.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest=
.java 2010-07-22 13:04:29 UTC (rev 9456)
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * =
+ * A ConsumerFilterTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConsumerFilterTest extends ServiceTestBase
+{
+ private static final Logger log =3D Logger.getLogger(ConsumerFilterTest=
.class);
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server =3D createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server =3D null;
+
+ super.tearDown();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessages() throws =
Exception
+ {
+ ClientSessionFactory sf =3D createInVMFactory();
+
+ ClientSession session =3D sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer =3D session.createProducer("foo");
+
+ ClientConsumer consumer =3D session.createConsumer("foo", "animal=3D=
'giraffe'");
+
+ ClientMessage message =3D session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+
+ assertNull(consumer.receive(500));
+
+ message =3D session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ log.info("sending second msg");
+
+ producer.send(message);
+
+ ClientMessage received =3D consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessagesMany() thr=
ows Exception
+ {
+ ClientSessionFactory sf =3D createInVMFactory();
+
+ ClientSession session =3D sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer =3D session.createProducer("foo");
+
+ ClientConsumer consumer =3D session.createConsumer("foo", "animal=3D=
'giraffe'");
+
+ for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+
+ ClientMessage message =3D session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+ }
+
+ assertNull(consumer.receive(500));
+
+ for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage message =3D session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ producer.send(message);
+ }
+
+ for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage received =3D consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+ }
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+}
--===============2767350186570900383==--