Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 21:33:35 -0500 (Mon, 21 Nov 2011)
New Revision: 11736
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-6030 - fixing transferring lock on receiveImmediate()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-22
01:50:41 UTC (rev 11735)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-22
02:33:35 UTC (rev 11736)
@@ -399,12 +399,29 @@
// We execute this on the same executor to make sure the force delivery
message is written after
// any delivery is completed
- ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
-
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
-
- callback.sendMessage(forcedDeliveryMessage, id, 0);
+ synchronized (lock)
+ {
+ if (transferring)
+ {
+ // Case it's transferring (reattach), we will retry later
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ forceDelivery(sequence);
+ }
+ });
+ }
+ else
+ {
+ ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
+
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ }
}
catch (Exception e)
{