[jboss-cvs] JBoss Messaging SVN: r8397 - branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 29 03:24:50 EDT 2011


Author: gaohoward
Date: 2011-07-29 03:24:50 -0400 (Fri, 29 Jul 2011)
New Revision: 8397

Modified:
   branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
the patch


Modified: branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-07-28 13:20:07 UTC (rev 8396)
+++ branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-07-29 07:24:50 UTC (rev 8397)
@@ -28,6 +28,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.jms.JMSException;
 import javax.jms.Session;
@@ -82,6 +84,8 @@
 	
 	private String suckerPassword;
 	
+	private ExecutorService suckerReaper;
+	
 	public ClusterConnectionManager(int nodeID,
 			                          String connectionFactoryUniqueName, boolean preserveOrdering,
 			                          String suckerUser,
@@ -119,6 +123,8 @@
 			return;
 		}
 		
+		suckerReaper = Executors.newCachedThreadPool();
+		
 		if (trace) { log.trace(this + " started"); }
 		
 		started = true;
@@ -142,6 +148,8 @@
 		
 		connections.clear();
 		
+		suckerReaper.shutdownNow();
+		
 		started = false;
 		
       if (trace) { log.trace(this + " stopped"); }
@@ -471,7 +479,7 @@
 			   }
 			   			   
 				MessageSucker sucker = new MessageSucker(localQueue, info.session, localInfo.session,
-				                                         preserveOrdering, sourceChannelID);
+				                                         preserveOrdering, sourceChannelID, suckerReaper);
 	
 				info.addSucker(sucker);
 				

Modified: branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-07-28 13:20:07 UTC (rev 8396)
+++ branches/PATCH_1893_ON_CP07/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-07-29 07:24:50 UTC (rev 8397)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.impl.clusterconnection;
 
+import java.util.concurrent.Executor;
+
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
@@ -79,13 +81,15 @@
 	
 	private JBossQueue jbq;
 	
+	private Executor suckerReaper;
+	
 	public String toString()
 	{
 		return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
 	}
 				
 	MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
-	              boolean preserveOrdering, long sourceChannelID)
+	              boolean preserveOrdering, long sourceChannelID, Executor suckerReaper)
    {	
       if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
       
@@ -100,6 +104,8 @@
       this.preserveOrdering = preserveOrdering;
       
       this.sourceChannelID = sourceChannelID;
+      
+      this.suckerReaper = suckerReaper;
    }
 	
 	synchronized void start() throws Exception
@@ -147,46 +153,51 @@
 			return;
 		}
 		
-		setConsuming(false);
-				
-		localQueue.unregisterSucker(this);
-		
-		try
-		{
-			consumer.closing(-1);
-		}
-		catch (Throwable t)
-		{
-			// Ignore
-		}
-		try
-		{
-			consumer.close();
-		}
-		catch (Throwable t)
-		{
-			//Ignore
-		}
-		
-		try
-		{
-			producer.close();
-		}
-		catch (Throwable t)
-		{
-			//Ignore
-		}
+		suckerReaper.execute(new Runnable() {
+		   public void run()
+		   {
+		      setConsuming(false);
+		            
+		      localQueue.unregisterSucker(MessageSucker.this);
+		      
+		      try
+		      {
+		         consumer.closing(-1);
+		      }
+		      catch (Throwable t)
+		      {
+		         // Ignore
+		      }
+		      try
+		      {
+		         consumer.close();
+		      }
+		      catch (Throwable t)
+		      {
+		         //Ignore
+		      }
+		      
+		      try
+		      {
+		         producer.close();
+		      }
+		      catch (Throwable t)
+		      {
+		         //Ignore
+		      }
 
-		sourceSession = null;
+		      sourceSession = null;
+		      
+		      localSession = null;
+		      
+		      consumer = null;
+		      
+		      clientConsumer = null;
+		      
+		      producer = null;
+		   }
+		});
 		
-		localSession = null;
-		
-		consumer = null;
-		
-		clientConsumer = null;
-		
-		producer = null;
-		
 		started = false;
 	}
 	
@@ -195,33 +206,47 @@
 		return this.localQueue.getName();
 	}
 	
-	public synchronized void setConsuming(boolean consume)
+	public void setConsuming(boolean consume)
 	{
-		if (trace) { log.trace(this + " setConsuming " + consume); }
+		if (trace) { log.trace(this + " setConsuming " + consume + " consuming " + consuming); }
 		
-		try
-		{
-			if (consume && !consuming)
-			{
-				if (trace) { log.trace(this + " resuming client consumer"); }
-			   
-			   clientConsumer.resume();
-				
-				consuming = true;
-			}
-			else if (!consume && consuming)
-			{
-				if (trace) { log.trace(this + " pausing client consumer"); }
-			   
-			   clientConsumer.pause();
-				
-				consuming = false;
-			}
-		}
-		catch (Exception e)
-		{
-			//We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
-		}
+      if (!started)
+      {
+         return;
+      }
+      
+      synchronized (this)
+      {
+         try
+         {
+            if (consume && !consuming)
+            {
+               if (trace)
+               {
+                  log.trace(this + " resuming client consumer");
+               }
+
+               clientConsumer.resume();
+
+               consuming = true;
+            }
+            else if (!consume && consuming)
+            {
+               if (trace)
+               {
+                  log.trace(this + " pausing client consumer");
+               }
+
+               clientConsumer.pause();
+
+               consuming = false;
+            }
+         }
+         catch (Exception e)
+         {
+            // We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
+         }
+      }
 	}
 		
 	public void onMessage(Message msg)



More information about the jboss-cvs-commits mailing list