[jboss-cvs] JBoss Messaging SVN: r8398 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Jul 31 11:55:12 EDT 2011


Author: gaohoward
Date: 2011-07-31 11:55:12 -0400 (Sun, 31 Jul 2011)
New Revision: 8398

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
Log:
JBMESSAGING-1893


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-07-29 07:24:50 UTC (rev 8397)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-07-31 15:55:12 UTC (rev 8398)
@@ -28,6 +28,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -86,6 +88,8 @@
    private int maxRetry;
    
    private int retryInterval; //5 sec
+   
+   private ExecutorService suckerReaper;
 	
 	public ClusterConnectionManager(int nodeID,
 			                          String connectionFactoryUniqueName, boolean preserveOrdering,
@@ -130,6 +134,8 @@
 			return;
 		}
 		
+		suckerReaper = Executors.newCachedThreadPool();
+		
 		if (trace) { log.trace(this + " started"); }
 		
 		started = true;
@@ -153,6 +159,8 @@
 		
 		connections.clear();
 		
+		suckerReaper.shutdownNow();
+		
 		started = false;
 		
       if (trace) { log.trace(this + " stopped"); }
@@ -490,7 +498,7 @@
 			   }
 			   			   
 				MessageSucker sucker = new MessageSucker(localQueue, info.session, localInfo.session,
-				                                         preserveOrdering, sourceChannelID);
+				                                         preserveOrdering, sourceChannelID, suckerReaper);
 	
 				info.addSucker(sucker);
 				

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-07-29 07:24:50 UTC (rev 8397)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-07-31 15:55:12 UTC (rev 8398)
@@ -22,12 +22,13 @@
 
 package org.jboss.messaging.core.impl.clusterconnection;
 
+import java.util.concurrent.Executor;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.transaction.TransactionManager;
 
 import org.jboss.jms.client.JBossSession;
 import org.jboss.jms.client.container.ClientConsumer;
@@ -65,8 +66,6 @@
 	
 	private volatile boolean started;
 	
-	private TransactionManager tm;
-	
 	private boolean consuming;
 	
 	private ClientConsumerDelegate consumer;
@@ -79,6 +78,8 @@
 	
 	protected JBossQueue jbq;
 	
+	private Executor suckerReaper;
+	
 	private boolean suspended = false;
 	
 	public String toString()
@@ -87,7 +88,7 @@
 	}
 				
 	protected MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
-	              boolean preserveOrdering, long sourceChannelID)
+	                        boolean preserveOrdering, long sourceChannelID, Executor suckerReaper)
    {	
       if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
       
@@ -102,6 +103,8 @@
       this.preserveOrdering = preserveOrdering;
       
       this.sourceChannelID = sourceChannelID;
+      
+      this.suckerReaper = suckerReaper;
    }
 	
 	protected synchronized void start() throws Exception
@@ -152,44 +155,52 @@
             return;
          }
 
-         setConsuming(false);
-
-         try
+         suckerReaper.execute(new Runnable()
          {
-            consumer.closing(-1);
-         }
-         catch (Throwable t)
-         {
-            // Ignore
-         }
-         try
-         {
-            consumer.close();
-         }
-         catch (Throwable t)
-         {
-            // Ignore
-         }
+            public void run()
+            {
+               setConsuming(false);
 
-         try
-         {
-            producer.close();
-         }
-         catch (Throwable t)
-         {
-            // Ignore
-         }
+               localQueue.unregisterSucker(MessageSucker.this);
 
-         sourceSession = null;
+               try
+               {
+                  consumer.closing(-1);
+               }
+               catch (Throwable t)
+               {
+                  // Ignore
+               }
+               try
+               {
+                  consumer.close();
+               }
+               catch (Throwable t)
+               {
+                  // Ignore
+               }
 
-         localSession = null;
+               try
+               {
+                  producer.close();
+               }
+               catch (Throwable t)
+               {
+                  // Ignore
+               }
 
-         consumer = null;
+               sourceSession = null;
 
-         clientConsumer = null;
+               localSession = null;
 
-         producer = null;
+               consumer = null;
 
+               clientConsumer = null;
+
+               producer = null;
+            }
+         });
+
          started = false;
       }
    }
@@ -267,46 +278,55 @@
 		return this.localQueue.getName();
 	}
 	
-	public synchronized void setConsuming(boolean consume)
+	public void setConsuming(boolean consume)
 	{
-		if (trace) { log.trace(this + " setConsuming " + consume); }
+	   if (trace) { log.trace(this + " setConsuming " + consume + " consuming " + consuming); }
       
 		if (!started)
       {
          return;
       }
 		
-		//for supended, we set the consuming flag and do nothing.
-		//later on resume, we force the sucker to be the last set consuming state.
-		if (suspended)
-		{
-		   consuming = consume;
-		   return;
-		}
-		
-		try
-		{
-			if (consume && !consuming)
-			{
-				if (trace) { log.trace(this + " resuming client consumer"); }
-			   
-			   clientConsumer.resume();
-				
-				consuming = true;
-			}
-			else if (!consume && consuming)
-			{
-				if (trace) { log.trace(this + " pausing client consumer"); }
-			   
-			   clientConsumer.pause();
-				
-				consuming = false;
-			}
-		}
-		catch (Exception e)
-		{
-			//We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
-		}
+      synchronized (this)
+      {
+         // for supended, we set the consuming flag and do nothing.
+         // later on resume, we force the sucker to be the last set consuming state.
+         if (suspended)
+         {
+            consuming = consume;
+            return;
+         }
+
+         try
+         {
+            if (consume && !consuming)
+            {
+               if (trace)
+               {
+                  log.trace(this + " resuming client consumer");
+               }
+
+               clientConsumer.resume();
+
+               consuming = true;
+            }
+            else if (!consume && consuming)
+            {
+               if (trace)
+               {
+                  log.trace(this + " pausing client consumer");
+               }
+
+               clientConsumer.pause();
+
+               consuming = false;
+            }
+         }
+         catch (Exception e)
+         {
+            // We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
+         }
+      }
 	}
 		
 	public void onMessage(Message msg)

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2011-07-29 07:24:50 UTC (rev 8397)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2011-07-31 15:55:12 UTC (rev 8398)
@@ -27,6 +27,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.jms.JMSException;
 import javax.jms.Queue;
@@ -83,6 +85,8 @@
    
    private int retryInterval;
    
+   private ExecutorService suckerReaper;
+   
    public FakeClusterConnectionManager(int remoteID,
                                    JBossConnectionFactory theFactory,
                                    String suckerUser,
@@ -114,6 +118,7 @@
       {
          return;
       }
+      suckerReaper = Executors.newCachedThreadPool();
       
       started = true;
    }
@@ -179,7 +184,7 @@
 
       if (!info.hasSucker(queue.getQueueName()))
       {
-            FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer);
+            FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer, this.suckerReaper);
 
             info.addSucker(sucker);
             
@@ -647,6 +652,8 @@
       
       connections.clear();
       
+      suckerReaper.shutdownNow();
+      
       started = false;
    }
 

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java	2011-07-29 07:24:50 UTC (rev 8397)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java	2011-07-31 15:55:12 UTC (rev 8398)
@@ -23,6 +23,7 @@
 package org.jboss.test.messaging.jms.clustering;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -50,9 +51,9 @@
 
    private boolean queueNotUpdated = true;
 
-   FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer)
+   FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer, Executor suckerReaper)
    {
-      super(localQueue, sourceSession, sourceSession, true, sourceChannelID);
+      super(localQueue, sourceSession, sourceSession, true, sourceChannelID, suckerReaper);
       this.buffer = buffer;
    }
 



More information about the jboss-cvs-commits mailing list