[jboss-cvs] JBoss Messaging SVN: r3246 - trunk/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 26 13:48:58 EDT 2007


Author: timfox
Date: 2007-10-26 13:48:58 -0400 (Fri, 26 Oct 2007)
New Revision: 3246

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Response now executed on different thread


Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-26 16:13:57 UTC (rev 3245)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-26 17:48:58 UTC (rev 3246)
@@ -219,6 +219,8 @@
    //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
    private QueuedExecutor replyExecutor;
    
+   private QueuedExecutor replicateResponseExecutor;
+   
    private volatile int failoverNodeID = -1;
    
    private volatile boolean firstNode;
@@ -1320,12 +1322,12 @@
    }
    
    
-   public void handleReplicateDeliveryAck(String sessionID, long deliveryID) throws Exception
+   public void handleReplicateDeliveryAck(String sessionID, final long deliveryID) throws Exception
    {
    	if (trace) { log.trace(this + " handleReplicateDeliveryAck " + sessionID + " " + deliveryID); }
    	  	
    	//TODO - this does not belong here
-   	ServerSessionEndpoint session = serverPeer.getSession(sessionID);
+   	final ServerSessionEndpoint session = serverPeer.getSession(sessionID);
    	
       if (this.useJGroupsWorkaround)
       {
@@ -1339,7 +1341,24 @@
    		return;
    	}   	   	
    	
-   	session.replicateDeliveryResponseReceived(deliveryID);
+      //Execute on a different thread to avoid taking up JGroups thread for too long
+      //which can cause heartbeats to be missed and the member to be suspected
+      
+      replicateResponseExecutor.execute(
+            new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     session.replicateDeliveryResponseReceived(deliveryID);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to process response", e);
+                  }
+               }
+            });
    }
    
    public void handleAckAllReplicatedDeliveries(int nodeID) throws Exception
@@ -1628,6 +1647,8 @@
       
       //NOTE, MUST be a QueuedExecutor so we ensure that responses arrive back in order
       replyExecutor = new QueuedExecutor(new LinkedQueue());
+      
+      replicateResponseExecutor = new QueuedExecutor(new LinkedQueue());
    }
    
    private void deInit()
@@ -1649,7 +1670,9 @@
          leftSet = null;
       }
    	
-   	replyExecutor.shutdownNow();   	
+   	replyExecutor.shutdownNow();   
+      
+      replicateResponseExecutor.shutdownNow();
    }
    
    private void requestDeliveries(Queue queue) throws Exception




More information about the jboss-cvs-commits mailing list