[hornetq-commits] JBoss hornetq SVN: r9456 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jul 22 09:04:29 EDT 2010


Author: timfox
Date: 2010-07-22 09:04:29 -0400 (Thu, 22 Jul 2010)
New Revision: 9456

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
Modified:
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-444

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-07-22 13:04:29 UTC (rev 9456)
@@ -56,6 +56,8 @@
 
 /**
  * Implementation of a Queue
+ * 
+ * Completely non blocking between adding to queue and delivering to consumers.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -69,6 +71,8 @@
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
 
    public static final int NUM_PRIORITIES = 10;
+   
+   public static final int MAX_DELIVERIES_IN_LOOP = 1000;
 
    private final long id;
 
@@ -121,6 +125,8 @@
    private int pos;
 
    private final Executor executor;
+   
+   private volatile int consumerWithFilterCount;
 
    private static class ConsumerHolder
    {
@@ -228,7 +234,7 @@
    {
       return filter;
    }
-   
+
    public void addLast(final MessageReference ref)
    {
       addLast(ref, false);
@@ -265,6 +271,11 @@
    {
       cancelRedistributor();
 
+      if (consumer.getFilter() != null)
+      {
+         consumerWithFilterCount++;
+      }
+
       consumerList.add(new ConsumerHolder(consumer));
 
       consumerSet.add(consumer);
@@ -307,6 +318,11 @@
       {
          groups.remove(gid);
       }
+
+      if (consumer.getFilter() != null)
+      {
+         consumerWithFilterCount--;
+      }
    }
 
    public synchronized void addRedistributor(final long delay)
@@ -1088,17 +1104,19 @@
       {
          return;
       }
-
+      
       int busyCount = 0;
 
       int nullRefCount = 0;
 
+      int noMatchCount = 0;
+
       int size = consumerList.size();
 
       int startPos = pos;
 
       // Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
-      int loop = Math.min(messageReferences.size(), 1000);
+      int loop = Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LOOP);
 
       for (int i = 0; i < loop; i++)
       {
@@ -1120,6 +1138,11 @@
          if (ref == null)
          {
             nullRefCount++;
+
+            if (holder.iter != null)
+            {
+               noMatchCount++;
+            }
          }
          else
          {
@@ -1207,18 +1230,17 @@
                break;
             }
 
-            nullRefCount = busyCount = 0;
+            nullRefCount = busyCount = noMatchCount = 0;
          }
       }
-
-      if (messageReferences.size() > 0 && busyCount != size)
+      
+      if (messageReferences.size() > 0 && busyCount != size && noMatchCount != size)
       {
          // More messages to deliver so need to prompt another runner - note we don't
          // prompt another one if all consumers are busy
 
          executor.execute(deliverRunner);
       }
-
    }
 
    /*
@@ -1230,14 +1252,14 @@
       {
          return false;
       }
-      
+
       if (checkExpired(ref))
       {
          return true;
       }
-      
+
       int startPos = pos;
-      
+
       int size = consumerList.size();
 
       while (true)
@@ -1261,7 +1283,7 @@
                consumer = groupConsumer;
             }
          }
-         
+
          pos++;
 
          if (pos == size)
@@ -1272,12 +1294,12 @@
          HandleStatus status = handle(ref, consumer);
 
          if (status == HandleStatus.HANDLED)
-         {            
+         {
             if (groupID != null && groupConsumer == null)
             {
                groups.put(groupID, consumer);
             }
-            
+
             return true;
          }
 
@@ -1344,9 +1366,11 @@
        * unnecessarily queued up
        * During delivery toDeliver is decremented before the message is delivered, therefore if it's delivering the last
        * message, then we cannot have a situation where this delivery is not prompted and message remains stranded in the
-       * queue
+       * queue.
+       * The exception to this is if we have consumers with filters - these will maintain an iterator, so we need to prompt delivery every time
+       * in this case, since there may be many non matching messages already in the queue
        */
-      if (refs == 1)
+      if (consumerWithFilterCount > 0 || refs == 1)
       {
          deliverAsync();
       }

Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java	2010-07-22 13:04:29 UTC (rev 9456)
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * 
+ * A ConsumerFilterTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConsumerFilterTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ConsumerFilterTest.class);
+
+   private HornetQServer server;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer(false);
+
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   public void testNonMatchingMessagesFollowedByMatchingMessages() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      ClientSession session = sf.createSession();
+
+      session.start();
+
+      session.createQueue("foo", "foo");
+
+      ClientProducer producer = session.createProducer("foo");
+
+      ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+      ClientMessage message = session.createMessage(false);
+
+      message.putStringProperty("animal", "hippo");
+
+      producer.send(message);
+
+      assertNull(consumer.receive(500));
+
+      message = session.createMessage(false);
+
+      message.putStringProperty("animal", "giraffe");
+
+      log.info("sending second msg");
+
+      producer.send(message);
+
+      ClientMessage received = consumer.receive(500);
+
+      assertNotNull(received);
+
+      assertEquals("giraffe", received.getStringProperty("animal"));
+
+      assertNull(consumer.receive(500));
+
+      session.close();
+   }
+
+   public void testNonMatchingMessagesFollowedByMatchingMessagesMany() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      ClientSession session = sf.createSession();
+
+      session.start();
+
+      session.createQueue("foo", "foo");
+
+      ClientProducer producer = session.createProducer("foo");
+
+      ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+      for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+      {
+
+         ClientMessage message = session.createMessage(false);
+
+         message.putStringProperty("animal", "hippo");
+
+         producer.send(message);
+      }
+
+      assertNull(consumer.receive(500));
+
+      for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+      {
+         ClientMessage message = session.createMessage(false);
+
+         message.putStringProperty("animal", "giraffe");
+
+         producer.send(message);
+      }
+
+      for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+      {
+         ClientMessage received = consumer.receive(500);
+
+         assertNotNull(received);
+
+         assertEquals("giraffe", received.getStringProperty("animal"));
+      }
+
+      assertNull(consumer.receive(500));
+
+      session.close();
+   }
+}



More information about the hornetq-commits mailing list