Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 18:39:18 -0500 (Mon, 21 Nov 2011)
New Revision: 11734
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-6030 - Avoiding distributed deadlock on receiveImmediate()
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21
23:09:13 UTC (rev 11733)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21
23:39:18 UTC (rev 11734)
@@ -398,9 +398,15 @@
{
checkClosed();
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID,
sequence);
-
- channel.send(request);
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ SessionForceConsumerDelivery request = new
SessionForceConsumerDelivery(consumerID, sequence);
+ channel.send(request);
+ }
+ });
}
public ClientConsumer createConsumer(final SimpleString queueName) throws
HornetQException
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21
23:09:13 UTC (rev 11733)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21
23:39:18 UTC (rev 11734)
@@ -389,33 +389,30 @@
{
promptDelivery();
- Future future = new Future();
-
- messageQueue.getExecutor().execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ messageQueue.getExecutor().execute(new Runnable()
{
- log.warn("Timed out waiting for executor");
- }
+ public void run()
+ {
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery
message is written after
+ // any delivery is completed
- try
- {
- // We execute this on the same executor to make sure the force delivery message
is written after
- // any delivery is completed
+ ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
- ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
-
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery
message", e);
+ }
+ }
+ });
- callback.sendMessage(forcedDeliveryMessage, id, 0);
- }
- catch (Exception e)
- {
- ServerConsumerImpl.log.error("Failed to send forced delivery message",
e);
- }
}
public LinkedList<MessageReference> cancelRefs(final boolean failed,
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21
23:09:13 UTC (rev 11733)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21
23:39:18 UTC (rev 11734)
@@ -569,7 +569,11 @@
{
ServerConsumer consumer = consumers.get(consumerID);
- consumer.forceDelivery(sequence);
+ // this would be possible if the server consumer was closed by pings/pongs.. etc
+ if (consumer != null)
+ {
+ consumer.forceDelivery(sequence);
+ }
}
public void acknowledge(final long consumerID, final long messageID) throws Exception