[hornetq-commits] JBoss hornetq SVN: r9699 - in trunk: src/main/org/hornetq/core/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 16 11:39:03 EDT 2010


Author: timfox
Date: 2010-09-16 11:39:03 -0400 (Thu, 16 Sep 2010)
New Revision: 9699

Added:
   trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
Modified:
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://jira.jboss.org/browse/HORNETQ-469

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2010-09-16 15:39:03 UTC (rev 9699)
@@ -155,4 +155,6 @@
    void blockOnExecutorFuture();
    
    void close() throws Exception;
+   
+   boolean isDirectDeliver();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-09-16 15:39:03 UTC (rev 9699)
@@ -75,9 +75,9 @@
    public static final int NUM_PRIORITIES = 10;
 
    public static final int MAX_DELIVERIES_IN_LOOP = 1000;
-   
-   private static final int CHECK_QUEUE_SIZE_PERIOD = 2000;
 
+   public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+
    private final long id;
 
    private final SimpleString name;
@@ -138,10 +138,10 @@
 
    private final Runnable concurrentPoller = new ConcurrentPoller();
 
-   private volatile boolean queued;
-   
-   private volatile boolean checkQueueSize = true;
+   private volatile boolean checkDirect;
 
+   private volatile boolean directDeliver = true;
+
    public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
@@ -191,11 +191,15 @@
       {
          public void run()
          {
-            checkQueueSize = true;
+            // This flag is periodically set to true. This enables the directDeliver flag to be set to true if the queue
+            // is empty
+            // We don't want to evaluate that on every delivery since that's too expensive
+
+            checkDirect = true;
          }
       }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
    }
-   
+
    // Bindable implementation -------------------------------------------------------------------------------------
 
    public SimpleString getRoutingName()
@@ -252,12 +256,10 @@
       {
          return;
       }
-      
+
       messageReferences.addHead(ref, ref.getMessage().getPriority());
-      
-      queued = true;
-      
-      checkQueueSize = false;
+
+      directDeliver = false;
    }
 
    public synchronized void reload(final MessageReference ref)
@@ -267,9 +269,7 @@
          messageReferences.addTail(ref, ref.getMessage().getPriority());
       }
 
-      queued = true;
-      
-      checkQueueSize = false;
+      directDeliver = false;
 
       messagesAdded++;
    }
@@ -291,41 +291,47 @@
          return;
       }
 
-      if (checkQueueSize)
+      // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
+      // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
+      // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
+      if (checkDirect)
       {
-         // This is an expensive operation so we don't want to do it every time we add a message, that's why we use the checkQueueSize flag
-         // which is set to true periodically using a scheduled executor
+         if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty())
+         {
+            // We must block on the executor to ensure any async deliveries have completed or we might get out of order
+            // deliveries
+            blockOnExecutorFuture();
 
-         queued = !messageReferences.isEmpty() || !concurrentQueue.isEmpty();
-
-         checkQueueSize = false;
+            // Go into direct delivery mode
+            directDeliver = true;
+         }
+         checkDirect = false;
       }
 
-      if (direct & !queued)
+      if (direct && directDeliver && deliverDirect(ref))
       {
-         if (deliverDirect(ref))
-         {
-            return;
-         }
+         return;
       }
 
       concurrentQueue.add(ref);
 
+      directDeliver = false;
+
       executor.execute(concurrentPoller);
    }
-   
+
    public void deliverAsync()
    {
       executor.execute(deliverRunner);
    }
-   
+
    public void close() throws Exception
    {
       if (checkQueueSizeFuture != null)
       {
          checkQueueSizeFuture.cancel(false);
       }
-      
+
       cancelRedistributor();
    }
 
@@ -486,7 +492,7 @@
          redistributorFuture = null;
       }
    }
-   
+
    @Override
    protected void finalize() throws Throwable
    {
@@ -494,9 +500,9 @@
       {
          checkQueueSizeFuture.cancel(false);
       }
-      
+
       cancelRedistributor();
-      
+
       super.finalize();
    }
 
@@ -998,6 +1004,11 @@
    {
       return paused;
    }
+   
+   public boolean isDirectDeliver()
+   {
+      return directDeliver;
+   }
 
    // Public
    // -----------------------------------------------------------------------------
@@ -1075,10 +1086,10 @@
             // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
 
             deliverAsync();
-            
+
             return;
          }
-         
+
          ConsumerHolder holder = consumerList.get(pos);
 
          Consumer consumer = holder.consumer;
@@ -1169,7 +1180,7 @@
          if (pos == size)
          {
             pos = 0;
-         }         
+         }
       }
    }
 

Added: trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java	2010-09-16 15:39:03 UTC (rev 9699)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * 
+ * A DirectDeliverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DirectDeliverTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(DirectDeliverTest.class);
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.DIRECT_DELIVER, true);
+
+      TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+
+      Configuration config = new ConfigurationImpl();
+      config.getAcceptorConfigurations().add(tc);
+
+      config.setSecurityEnabled(false);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   protected ClientSessionFactory createSessionFactory()
+   {
+      ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+      return sf;
+   }
+
+   public void testDirectDeliver() throws Exception
+   {
+      final String foo = "foo";
+      
+      ClientSessionFactory sf = createSessionFactory();
+
+      ClientSession session = sf.createSession();
+
+      session.createQueue(foo, foo);
+
+      Binding binding = server.getPostOffice().getBinding(new SimpleString(foo));
+      
+      Queue queue = (Queue)binding.getBindable();
+      
+      assertTrue(queue.isDirectDeliver());
+           
+      ClientProducer prod = session.createProducer(foo);
+
+      ClientConsumer cons = session.createConsumer(foo);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+
+         prod.send(msg);
+      }
+      
+      queue.blockOnExecutorFuture();
+      
+      //Consumer is not started so should go queued
+      assertFalse(queue.isDirectDeliver());
+      
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = cons.receive(10000);
+
+         assertNotNull(msg);
+
+         msg.acknowledge();
+      }
+      
+      Thread.sleep((long)(QueueImpl.CHECK_QUEUE_SIZE_PERIOD * 1.5));
+      
+      //Add another message, should go direct
+      ClientMessage msg = session.createMessage(true);
+
+      prod.send(msg);
+      
+      queue.blockOnExecutorFuture();
+            
+      assertTrue(queue.isDirectDeliver());
+      
+      //Send some more
+      for (int i = 0; i < numMessages; i++)
+      {
+         msg = session.createMessage(true);
+
+         prod.send(msg);
+      }
+      
+      for (int i = 0; i < numMessages + 1; i++)
+      {
+         msg = cons.receive(10000);
+
+         assertNotNull(msg);
+
+         msg.acknowledge();
+      }
+      
+      assertTrue(queue.isDirectDeliver());
+      
+      session.stop();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         msg = session.createMessage(true);
+
+         prod.send(msg);
+      }
+      
+      assertFalse(queue.isDirectDeliver());
+      
+
+      sf.close();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-09-16 15:39:03 UTC (rev 9699)
@@ -27,8 +27,21 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 
+/**
+ * A FakeQueue
+ *
+ * @author tim
+ *
+ *
+ */
 public class FakeQueue implements Queue
 {
+   public boolean isDirectDeliver()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
    public void close()
    {
       // TODO Auto-generated method stub



More information about the hornetq-commits mailing list