[hornetq-commits] JBoss hornetq SVN: r11733 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 21 18:09:13 EST 2011


Author: clebert.suconic
Date: 2011-11-21 18:09:13 -0500 (Mon, 21 Nov 2011)
New Revision: 11733

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-6030 - Avoiding distributed deadlock on receiveImmediate()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-21 23:09:13 UTC (rev 11733)
@@ -398,9 +398,15 @@
    {
       checkClosed();
 
-      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
-
-      channel.send(request);
+      // JBPAPP-6030 - Using the executor to avoid distributed dead locks 
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+            channel.send(request);
+         }
+      });
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-11-21 23:09:13 UTC (rev 11733)
@@ -389,33 +389,30 @@
    {
       promptDelivery();
 
-      Future future = new Future();
-
-      messageQueue.getExecutor().execute(future);
-
-      boolean ok = future.await(10000);
-
-      if (!ok)
+      // JBPAPP-6030 - Using the executor to avoid distributed dead locks 
+      messageQueue.getExecutor().execute(new Runnable()
       {
-         log.warn("Timed out waiting for executor");
-      }
+         public void run()
+         {
+            try
+            {
+               // We execute this on the same executor to make sure the force delivery message is written after
+               // any delivery is completed
 
-      try
-      {
-         // We execute this on the same executor to make sure the force delivery message is written after
-         // any delivery is completed
+               ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
 
-         ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+               forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+               forcedDeliveryMessage.setAddress(messageQueue.getName());
 
-         forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-         forcedDeliveryMessage.setAddress(messageQueue.getName());
+               callback.sendMessage(forcedDeliveryMessage, id, 0);
+            }
+            catch (Exception e)
+            {
+               ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+            }
+         }
+      });
 
-         callback.sendMessage(forcedDeliveryMessage, id, 0);
-      }
-      catch (Exception e)
-      {
-         ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
-      }
    }
 
    public LinkedList<MessageReference> cancelRefs(final boolean failed,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-11-21 23:09:13 UTC (rev 11733)
@@ -569,7 +569,11 @@
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
-      consumer.forceDelivery(sequence);
+      // this would be possible if the server consumer was closed by pings/pongs.. etc
+      if (consumer != null)
+      {
+         consumer.forceDelivery(sequence);
+      }
    }
 
    public void acknowledge(final long consumerID, final long messageID) throws Exception



More information about the hornetq-commits mailing list