[jboss-cvs] JBoss Messaging SVN: r8512 - branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 15 05:01:54 EDT 2012


Author: raggz
Date: 2012-03-15 05:01:53 -0400 (Thu, 15 Mar 2012)
New Revision: 8512

Modified:
   branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
Log:
Back port of JBMessaging-1918.


Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2012-03-15 08:45:07 UTC (rev 8511)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2012-03-15 09:01:53 UTC (rev 8512)
@@ -644,7 +644,7 @@
 		
 		protected Map suckers;
 		
-		protected boolean started;
+		protected volatile boolean started;
 		
 		private String suckerUser;
 		
@@ -711,89 +711,112 @@
 			started = false;
 		}
 		
-		synchronized void resetAllSuckers()
-		{
-			Iterator iter = suckers.values().iterator();
-			
-			while (iter.hasNext())
-			{
-				MessageSucker sucker = (MessageSucker)iter.next();
-				
-				sucker.setConsuming(false);
-			}
-		}
+      void resetAllSuckers()
+      {
+         synchronized (suckers)
+         {
+            Iterator iter = suckers.values().iterator();
+
+            while (iter.hasNext())
+            {
+               MessageSucker sucker = (MessageSucker)iter.next();
+
+               sucker.setConsuming(false);
+            }
+         }
+      }
 		
-		synchronized void closeAllSuckers()
-		{
-			Iterator iter = suckers.values().iterator();
-			
-			while (iter.hasNext())
-			{
-				MessageSucker sucker = (MessageSucker)iter.next();
-				
-				sucker.stop();
-			}
-			
-			suckers.clear();
-		}
+      void closeAllSuckers()
+      {
+         synchronized (suckers)
+         {
+            Iterator iter = suckers.values().iterator();
+
+            while (iter.hasNext())
+            {
+               MessageSucker sucker = (MessageSucker)iter.next();
+
+               sucker.stop();
+            }
+
+            suckers.clear();
+         }
+      }
 		
-		protected synchronized void close()
-		{
-			closeAllSuckers();			
-			
-			//Note we use a timed callable since remoting has a habit of hanging on attempting to close
-			//We do not want this to hang the system - especially failover
-			
-			Callable callable = new Callable() { public Object call()
-			{
-				try
-				{
-					connection.close();
-				}
-				catch (JMSException ignore)
-				{					
-				}
-				return null;
-		    } };
-			
-			Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
-			
-			try
-			{
-				timedCallable.call();
-			}
-			catch (Throwable t)
-			{
-				//Ignore - the server might have already closed - so this is ok
-			}
-			
-			connection = null;
-			
-			started = false;
-		}
+      protected void close()
+      {
+         closeAllSuckers();
+
+         // Note we use a timed callable since remoting has a habit of hanging on attempting to close
+         // We do not want this to hang the system - especially failover
+         synchronized (this)
+         {
+            if (!started)
+               return;
+
+            Callable callable = new Callable()
+            {
+               public Object call()
+               {
+                  try
+                  {
+                     connection.close();
+                  }
+                  catch (JMSException ignore)
+                  {
+                  }
+                  return null;
+               }
+            };
+
+            Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+
+            try
+            {
+               timedCallable.call();
+            }
+            catch (Throwable t)
+            {
+               // Ignore - the server might have already closed - so this is ok
+            }
+
+            connection = null;
+
+            started = false;
+         }
+      }
 		
-		protected synchronized boolean hasSucker(String queueName)
-		{
-			return suckers.containsKey(queueName);
-		}
+      protected boolean hasSucker(String queueName)
+      {
+         synchronized (suckers)
+         {
+            return suckers.containsKey(queueName);
+         }
+      }
 		
-		protected synchronized void addSucker(MessageSucker sucker)
-		{
-			if (suckers.containsKey(sucker.getQueueName()))
-			{
-				throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
-			}
-			
-			suckers.put(sucker.getQueueName(), sucker);
-		}
+      protected void addSucker(MessageSucker sucker)
+      {
+         synchronized (suckers)
+         {
+            if (suckers.containsKey(sucker.getQueueName()))
+            {
+               throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
+            }
+
+            suckers.put(sucker.getQueueName(), sucker);
+         }
+      }
 		
-		synchronized MessageSucker removeSucker(String queueName)
-		{
-			MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
-			
-			return sucker;
-		}
+      MessageSucker removeSucker(String queueName)
+      {
+         synchronized (suckers)
+         {
+            MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
 
+            return sucker;
+         }
+      }
+
 		//https://jira.jboss.org/jira/browse/JBMESSAGING-1732
 		//on exception, try to recreate all suckers.
       public void onException(JMSException e)
@@ -805,15 +828,21 @@
       
       //first stop all the suckers
       //then try to close the connection
-      protected synchronized void cleanupConnection()
+      protected void cleanupConnection()
       {
          if (!started)
          {
             return;
          }
          
-         Iterator iter = suckers.values().iterator();
+         Map suckersCopy = null;
+         synchronized (suckers)
+         {
+            suckersCopy = new HashMap(suckers);
+         }
          
+         Iterator iter = suckersCopy.values().iterator();
+         
          while (iter.hasNext())
          {
             MessageSucker sucker = (MessageSucker)iter.next();
@@ -821,11 +850,13 @@
             sucker.suspend();
          }
 
+         final JBossConnection copy = connection;
+         
          Callable callable = new Callable() { public Object call()
          {
             try
             {
-               connection.close();
+               copy.close();
             }
             catch (JMSException ignore)
             {              
@@ -844,16 +875,27 @@
             //Ignore - the server might have already closed - so this is ok
          }
          
-         connection = null;
-         
-         started = false;
+         synchronized (this)
+         {
+            connection = null;
+
+            started = false;
+         }
       }
       
+      protected int getSuckerSize()
+      {
+         synchronized(suckers)
+         {
+            return suckers.size();
+         }
+      }
+      
       protected synchronized int retryConnection()
       {         
          int retryCount = 0;
          
-         while (((maxRetry == -1) || (retryCount < maxRetry)) && (suckers.size() > 0))
+         while (((maxRetry == -1) || (retryCount < maxRetry)) && (getSuckerSize() > 0))
          {
             try
             {
@@ -884,20 +926,23 @@
          }
          
          //now resume the suckers
-         Iterator iter = suckers.values().iterator();
-         
-         while (iter.hasNext())
+         synchronized (suckers)
          {
-            MessageSucker sucker = (MessageSucker)iter.next();
-            
-            try
+            Iterator iter = suckers.values().iterator();
+
+            while (iter.hasNext())
             {
-               sucker.resume(session);
+               MessageSucker sucker = (MessageSucker)iter.next();
+
+               try
+               {
+                  sucker.resume(session);
+               }
+               catch (JMSException e)
+               {
+                  log.warn("Error resuming sucker " + sucker, e);
+               }
             }
-            catch (JMSException e)
-            {
-               log.warn("Error resuming sucker " + sucker, e);
-            }
          }
          
          return retryCount;



More information about the jboss-cvs-commits mailing list