Author: timfox
Date: 2010-01-12 09:34:03 -0500 (Tue, 12 Jan 2010)
New Revision: 8794
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-264
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-12
13:23:13 UTC (rev 8793)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-12
14:34:03 UTC (rev 8794)
@@ -337,33 +337,38 @@
* and knows that there are no other messages to be delivered.
*/
public synchronized void forceDelivery(final long sequence)
- {
+ {
promptDelivery();
-
- executor.execute(new Runnable()
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
{
- public void run()
- {
- try
- {
- // We execute this on the same executor to make sure the force delivery
message is written after
- // any delivery is completed
-
- ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
+ log.warn("Timed out waiting for executor");
+ }
-
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery message
is written after
+ // any delivery is completed
- final SessionReceiveMessage packet = new SessionReceiveMessage(id,
forcedDeliveryMessage, 0);
+ ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
- channel.send(packet);
- }
- catch (Exception e)
- {
- ServerConsumerImpl.log.error("Failed to send forced delivery
message", e);
- }
- }
- });
+
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id,
forcedDeliveryMessage, 0);
+
+ channel.send(packet);
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery message",
e);
+ }
}
public LinkedList<MessageReference> cancelRefs(final boolean
lastConsumedAsDelivered, final Transaction tx) throws Exception
@@ -442,7 +447,7 @@
lock.unlock();
}
- //Outside the lock
+ // Outside the lock
if (transferring)
{
// And we must wait for any force delivery to be executed - this is executed
async so we add a future to the
@@ -659,7 +664,7 @@
else
{
// prompt Delivery only if chunk was finished
-
+
messageQueue.deliverAsync();
}
}