[hornetq-commits] JBoss hornetq SVN: r8661 - in trunk: src/main/org/hornetq/core/server/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 10 04:44:05 EST 2009


Author: timfox
Date: 2009-12-10 04:44:05 -0500 (Thu, 10 Dec 2009)
New Revision: 8661

Modified:
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
   trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
simpified receive immediate logic on the server

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -120,8 +120,6 @@
 
    boolean hasMatchingConsumer(ServerMessage message);
 
-   void deliverNow();
-
    Collection<Consumer> getConsumers();
 
    boolean checkDLQ(MessageReference ref) throws Exception;

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -68,7 +68,7 @@
 
    void close() throws Exception;
 
-   void promptDelivery(Queue queue, boolean async);
+   void promptDelivery(Queue queue);
 
    void handleAcknowledge(final SessionAcknowledgeMessage packet);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -337,20 +337,19 @@
     * When the consumer receives such a "forced delivery" message, it discards it
     * and knows that there are no other messages to be delivered.
     */
-
-   // TODO - why is this executed on a different thread?
    public synchronized void forceDelivery(final long sequence)
    {      
+      promptDelivery();
+      
       executor.execute(new Runnable()
       {
          public void run()
          {
             try
             {
-               // The prompt delivery is called synchronously to ensure the "forced delivery" message is
-               // sent after any queue delivery.
-               promptDelivery(false);
-
+               // We execute this on the same executor to make sure the force delivery message is written after
+               // any delivery is completed
+               
                ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
 
                forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
@@ -413,7 +412,7 @@
       // Outside the lock
       if (started)
       {
-         promptDelivery(true);
+         promptDelivery();
       }
    }
 
@@ -465,7 +464,7 @@
 
       if (!transferring)
       {
-         promptDelivery(true);
+         promptDelivery();
       }
    }
 
@@ -491,7 +490,7 @@
 
          if (previous <= 0 && previous + credits > 0)
          {
-            promptDelivery(true);
+            promptDelivery();
          }
       }
    }
@@ -580,7 +579,7 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private void promptDelivery(final boolean asyncDelivery)
+   private void promptDelivery()
    {
       lock.lock();
       try
@@ -595,18 +594,11 @@
          {
             if (browseOnly)
             {
-               if (asyncDelivery)
-               {
-                  executor.execute(browserDeliverer);
-               }
-               else
-               {
-                  browserDeliverer.run();
-               }
+               executor.execute(browserDeliverer);
             }
             else
             {
-               session.promptDelivery(messageQueue, asyncDelivery);
+               session.promptDelivery(messageQueue);
             }
          }
       }
@@ -668,7 +660,7 @@
                else
                {
                   // prompt Delivery only if chunk was finished
-                  session.promptDelivery(messageQueue, true);
+                  session.promptDelivery(messageQueue);
                }
             }
          }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -114,30 +114,7 @@
    private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
 
    // Static -------------------------------------------------------------------------------
-
-   // TODO not actually used currently
-   // private static int offset;
-   //
-   // static
-   // {
-   // try
-   // {
-   // ServerMessage msg = new ServerMessageImpl(1, ChannelBuffers.EMPTY_BUFFER);
-   //
-   // msg.setDestination(new SimpleString("foobar"));
-   //   
-   // int es = msg.getEncodeSize();
-   //   
-   // int me = msg.getMemoryEstimate();
-   //   
-   // offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
-   // }
-   // catch (Exception e)
-   // {
-   // log.error("Failed to initialise mult and offset", e);
-   // }
-   // }
-
+  
    // Attributes ----------------------------------------------------------------------------
 
    private final long id;
@@ -361,16 +338,9 @@
       }
    }
 
-   public void promptDelivery(final Queue queue, final boolean async)
+   public void promptDelivery(final Queue queue)
    {
-      if (async)
-      {
-         queue.deliverAsync(executor);
-      }
-      else
-      {
-         queue.deliverNow();
-      }
+      queue.deliverAsync(executor);
    }
 
    public void handleCreateConsumer(final SessionCreateConsumerMessage packet)

Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -21,6 +21,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakeQueueFactory;
 import org.hornetq.tests.util.UnitTestCase;
@@ -60,7 +61,7 @@
     */
    public void testConcurrentAddsDeliver() throws Exception
    {
-      Queue queue = queueFactory.createQueue(1,
+      QueueImpl queue = (QueueImpl)queueFactory.createQueue(1,
                                              new SimpleString("address1"),
                                              new SimpleString("queue1"),
                                              null,
@@ -162,7 +163,7 @@
    {
       private volatile Exception e;
 
-      private final Queue queue;
+      private final QueueImpl queue;
 
       private final FakeConsumer consumer;
 
@@ -182,7 +183,7 @@
          return e;
       }
 
-      Toggler(final Queue queue, final FakeConsumer consumer, final long testTime)
+      Toggler(final QueueImpl queue, final FakeConsumer consumer, final long testTime)
       {
          this.testTime = testTime;
 

Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -76,7 +76,7 @@
 
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   new SimpleString("address1"),
                                   new SimpleString("queue1"),
                                   null,
@@ -150,7 +150,7 @@
 
    private void testScheduled(final boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   new SimpleString("address1"),
                                   new SimpleString("queue1"),
                                   null,
@@ -265,7 +265,7 @@
             return HandleStatus.HANDLED;
          }
       };
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   new SimpleString("address1"),
                                   QueueImplTest.queue1,
                                   null,

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-12-10 09:44:05 UTC (rev 8661)
@@ -69,7 +69,7 @@
    {
       final SimpleString name = new SimpleString("oobblle");
 
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   name,
                                   null,
@@ -85,7 +85,7 @@
 
    public void testDurable()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -120,7 +120,7 @@
 
       Consumer cons3 = new FakeConsumer();
 
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -170,7 +170,7 @@
 
    public void testGetFilter()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -213,7 +213,7 @@
 
    public void testSimpleadd()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -241,7 +241,7 @@
 
    public void testSimpleDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -278,7 +278,7 @@
 
    public void testSimpleNonDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -325,7 +325,7 @@
 
    public void testBusyConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -378,7 +378,7 @@
 
    public void testBusyConsumerThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -454,7 +454,7 @@
 
    public void testAddFirstadd() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -517,7 +517,7 @@
 
    public void testChangeConsumersAndDeliver() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -680,7 +680,7 @@
 
    public void testConsumerReturningNull() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -722,7 +722,7 @@
 
    public void testRoundRobinWithQueueing() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -774,7 +774,7 @@
 
    public void testRoundRobinDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -824,7 +824,7 @@
 
    public void testWithPriorities() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -900,7 +900,7 @@
 
    public void testConsumerWithFilterAddAndRemove()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -918,7 +918,7 @@
 
    public void testList()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -951,7 +951,7 @@
 
    public void testListWithFilter()
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -996,7 +996,7 @@
 
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1074,7 +1074,7 @@
 
    public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1124,7 +1124,7 @@
 
    public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1207,7 +1207,7 @@
 
    public void testConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1280,7 +1280,7 @@
 
    private void testConsumerWithFilters(final boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1378,7 +1378,7 @@
    public void testMessageOrder() throws Exception
    {
       FakeConsumer consumer = new FakeConsumer();
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1407,7 +1407,7 @@
 
    public void testMessagesAdded() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1428,7 +1428,7 @@
 
    public void testGetReference() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1450,7 +1450,7 @@
 
    public void testGetNonExistentReference() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1476,7 +1476,7 @@
     */
    public void testPauseAndResumeWithAsync() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1540,7 +1540,7 @@
 
    public void testPauseAndResumeWithDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1,
+      QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
                                   QueueImplTest.queue1,
                                   null,
@@ -1588,7 +1588,7 @@
 
    class AddtoQueueRunner implements Runnable
    {
-      Queue queue;
+      QueueImpl queue;
 
       MessageReference messageReference;
 
@@ -1599,7 +1599,7 @@
       boolean first;
 
       public AddtoQueueRunner(final boolean first,
-                              final Queue queue,
+                              final QueueImpl queue,
                               final MessageReference messageReference,
                               final CountDownLatch countDownLatch)
       {



More information about the hornetq-commits mailing list