[hornetq-commits] JBoss hornetq SVN: r8794 - trunk/src/main/org/hornetq/core/server/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 12 09:34:03 EST 2010


Author: timfox
Date: 2010-01-12 09:34:03 -0500 (Tue, 12 Jan 2010)
New Revision: 8794

Modified:
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-264

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-12 13:23:13 UTC (rev 8793)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-12 14:34:03 UTC (rev 8794)
@@ -337,33 +337,38 @@
     * and knows that there are no other messages to be delivered.
     */
    public synchronized void forceDelivery(final long sequence)
-   {      
+   {
       promptDelivery();
-      
-      executor.execute(new Runnable()
+
+      Future future = new Future();
+
+      executor.execute(future);
+
+      boolean ok = future.await(10000);
+
+      if (!ok)
       {
-         public void run()
-         {
-            try
-            {
-               // We execute this on the same executor to make sure the force delivery message is written after
-               // any delivery is completed
-               
-               ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+         log.warn("Timed out waiting for executor");
+      }
 
-               forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-               forcedDeliveryMessage.setAddress(messageQueue.getName());
+      try
+      {
+         // We execute this on the same executor to make sure the force delivery message is written after
+         // any delivery is completed
 
-               final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+         ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
 
-               channel.send(packet);
-            }
-            catch (Exception e)
-            {
-               ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
-            }
-         }
-      });
+         forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+         forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+         final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+
+         channel.send(packet);
+      }
+      catch (Exception e)
+      {
+         ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+      }
    }
 
    public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
@@ -442,7 +447,7 @@
          lock.unlock();
       }
 
-      //Outside the lock
+      // Outside the lock
       if (transferring)
       {
          // And we must wait for any force delivery to be executed - this is executed async so we add a future to the
@@ -659,7 +664,7 @@
                else
                {
                   // prompt Delivery only if chunk was finished
-         
+
                   messageQueue.deliverAsync();
                }
             }



More information about the hornetq-commits mailing list