From do-not-reply at jboss.org Thu Jul 22 09:04:29 2010 Content-Type: multipart/mixed; boundary="===============2767350186570900383==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9456 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory. Date: Thu, 22 Jul 2010 09:04:29 -0400 Message-ID: <201007221304.o6MD4T5m031410@svn01.web.mwc.hst.phx2.redhat.com> --===============2767350186570900383== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: timfox Date: 2010-07-22 09:04:29 -0400 (Thu, 22 Jul 2010) New Revision: 9456 Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.= java Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java Log: https://jira.jboss.org/browse/HORNETQ-444 Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 1= 3:01:25 UTC (rev 9455) +++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 1= 3:04:29 UTC (rev 9456) @@ -56,6 +56,8 @@ = /** * Implementation of a Queue + * = + * Completely non blocking between adding to queue and delivering to consu= mers. * * @author Tim Fox * @author Andy Taylor @@ -69,6 +71,8 @@ public static final int REDISTRIBUTOR_BATCH_SIZE =3D 100; = public static final int NUM_PRIORITIES =3D 10; + = + public static final int MAX_DELIVERIES_IN_LOOP =3D 1000; = private final long id; = @@ -121,6 +125,8 @@ private int pos; = private final Executor executor; + = + private volatile int consumerWithFilterCount; = private static class ConsumerHolder { @@ -228,7 +234,7 @@ { return filter; } - = + public void addLast(final MessageReference ref) { addLast(ref, false); @@ -265,6 +271,11 @@ { cancelRedistributor(); = + if (consumer.getFilter() !=3D null) + { + consumerWithFilterCount++; + } + consumerList.add(new ConsumerHolder(consumer)); = consumerSet.add(consumer); @@ -307,6 +318,11 @@ { groups.remove(gid); } + + if (consumer.getFilter() !=3D null) + { + consumerWithFilterCount--; + } } = public synchronized void addRedistributor(final long delay) @@ -1088,17 +1104,19 @@ { return; } - + = int busyCount =3D 0; = int nullRefCount =3D 0; = + int noMatchCount =3D 0; + int size =3D consumerList.size(); = int startPos =3D pos; = // Deliver at most 1000 messages in one go, to prevent tying this th= read up for too long - int loop =3D Math.min(messageReferences.size(), 1000); + int loop =3D Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LO= OP); = for (int i =3D 0; i < loop; i++) { @@ -1120,6 +1138,11 @@ if (ref =3D=3D null) { nullRefCount++; + + if (holder.iter !=3D null) + { + noMatchCount++; + } } else { @@ -1207,18 +1230,17 @@ break; } = - nullRefCount =3D busyCount =3D 0; + nullRefCount =3D busyCount =3D noMatchCount =3D 0; } } - - if (messageReferences.size() > 0 && busyCount !=3D size) + = + if (messageReferences.size() > 0 && busyCount !=3D size && noMatchCo= unt !=3D size) { // More messages to deliver so need to prompt another runner - no= te we don't // prompt another one if all consumers are busy = executor.execute(deliverRunner); } - } = /* @@ -1230,14 +1252,14 @@ { return false; } - = + if (checkExpired(ref)) { return true; } - = + int startPos =3D pos; - = + int size =3D consumerList.size(); = while (true) @@ -1261,7 +1283,7 @@ consumer =3D groupConsumer; } } - = + pos++; = if (pos =3D=3D size) @@ -1272,12 +1294,12 @@ HandleStatus status =3D handle(ref, consumer); = if (status =3D=3D HandleStatus.HANDLED) - { = + { if (groupID !=3D null && groupConsumer =3D=3D null) { groups.put(groupID, consumer); } - = + return true; } = @@ -1344,9 +1366,11 @@ * unnecessarily queued up * During delivery toDeliver is decremented before the message is de= livered, therefore if it's delivering the last * message, then we cannot have a situation where this delivery is n= ot prompted and message remains stranded in the - * queue + * queue. + * The exception to this is if we have consumers with filters - thes= e will maintain an iterator, so we need to prompt delivery every time + * in this case, since there may be many non matching messages alrea= dy in the queue */ - if (refs =3D=3D 1) + if (consumerWithFilterCount > 0 || refs =3D=3D 1) { deliverAsync(); } Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterT= est.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest= .java (rev 0) +++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest= .java 2010-07-22 13:04:29 UTC (rev 9456) @@ -0,0 +1,148 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.client; + +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.core.logging.Logger; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.impl.QueueImpl; +import org.hornetq.tests.util.ServiceTestBase; + +/** + * = + * A ConsumerFilterTest + * + * @author Tim Fox + * + * + */ +public class ConsumerFilterTest extends ServiceTestBase +{ + private static final Logger log =3D Logger.getLogger(ConsumerFilterTest= .class); + + private HornetQServer server; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + server =3D createServer(false); + + server.start(); + } + + @Override + protected void tearDown() throws Exception + { + server.stop(); + + server =3D null; + + super.tearDown(); + } + + public void testNonMatchingMessagesFollowedByMatchingMessages() throws = Exception + { + ClientSessionFactory sf =3D createInVMFactory(); + + ClientSession session =3D sf.createSession(); + + session.start(); + + session.createQueue("foo", "foo"); + + ClientProducer producer =3D session.createProducer("foo"); + + ClientConsumer consumer =3D session.createConsumer("foo", "animal=3D= 'giraffe'"); + + ClientMessage message =3D session.createMessage(false); + + message.putStringProperty("animal", "hippo"); + + producer.send(message); + + assertNull(consumer.receive(500)); + + message =3D session.createMessage(false); + + message.putStringProperty("animal", "giraffe"); + + log.info("sending second msg"); + + producer.send(message); + + ClientMessage received =3D consumer.receive(500); + + assertNotNull(received); + + assertEquals("giraffe", received.getStringProperty("animal")); + + assertNull(consumer.receive(500)); + + session.close(); + } + + public void testNonMatchingMessagesFollowedByMatchingMessagesMany() thr= ows Exception + { + ClientSessionFactory sf =3D createInVMFactory(); + + ClientSession session =3D sf.createSession(); + + session.start(); + + session.createQueue("foo", "foo"); + + ClientProducer producer =3D session.createProducer("foo"); + + ClientConsumer consumer =3D session.createConsumer("foo", "animal=3D= 'giraffe'"); + + for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++) + { + + ClientMessage message =3D session.createMessage(false); + + message.putStringProperty("animal", "hippo"); + + producer.send(message); + } + + assertNull(consumer.receive(500)); + + for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++) + { + ClientMessage message =3D session.createMessage(false); + + message.putStringProperty("animal", "giraffe"); + + producer.send(message); + } + + for (int i =3D 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++) + { + ClientMessage received =3D consumer.receive(500); + + assertNotNull(received); + + assertEquals("giraffe", received.getStringProperty("animal")); + } + + assertNull(consumer.receive(500)); + + session.close(); + } +} --===============2767350186570900383==--