[jboss-cvs] JBoss Messaging SVN: r8512 - branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 15 05:01:54 EDT 2012
Author: raggz
Date: 2012-03-15 05:01:53 -0400 (Thu, 15 Mar 2012)
New Revision: 8512
Modified:
branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
Log:
Back port of JBMessaging-1918.
Modified: branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2012-03-15 08:45:07 UTC (rev 8511)
+++ branches/JBossMessaging_1_4_8_SP5_JBMessaging-1912_JBMessaging-1918/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2012-03-15 09:01:53 UTC (rev 8512)
@@ -644,7 +644,7 @@
protected Map suckers;
- protected boolean started;
+ protected volatile boolean started;
private String suckerUser;
@@ -711,89 +711,112 @@
started = false;
}
- synchronized void resetAllSuckers()
- {
- Iterator iter = suckers.values().iterator();
-
- while (iter.hasNext())
- {
- MessageSucker sucker = (MessageSucker)iter.next();
-
- sucker.setConsuming(false);
- }
- }
+ void resetAllSuckers()
+ {
+ synchronized (suckers)
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.setConsuming(false);
+ }
+ }
+ }
- synchronized void closeAllSuckers()
- {
- Iterator iter = suckers.values().iterator();
-
- while (iter.hasNext())
- {
- MessageSucker sucker = (MessageSucker)iter.next();
-
- sucker.stop();
- }
-
- suckers.clear();
- }
+ void closeAllSuckers()
+ {
+ synchronized (suckers)
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.stop();
+ }
+
+ suckers.clear();
+ }
+ }
- protected synchronized void close()
- {
- closeAllSuckers();
-
- //Note we use a timed callable since remoting has a habit of hanging on attempting to close
- //We do not want this to hang the system - especially failover
-
- Callable callable = new Callable() { public Object call()
- {
- try
- {
- connection.close();
- }
- catch (JMSException ignore)
- {
- }
- return null;
- } };
-
- Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
-
- try
- {
- timedCallable.call();
- }
- catch (Throwable t)
- {
- //Ignore - the server might have already closed - so this is ok
- }
-
- connection = null;
-
- started = false;
- }
+ protected void close()
+ {
+ closeAllSuckers();
+
+ // Note we use a timed callable since remoting has a habit of hanging on attempting to close
+ // We do not want this to hang the system - especially failover
+ synchronized (this)
+ {
+ if (!started)
+ return;
+
+ Callable callable = new Callable()
+ {
+ public Object call()
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException ignore)
+ {
+ }
+ return null;
+ }
+ };
+
+ Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+
+ try
+ {
+ timedCallable.call();
+ }
+ catch (Throwable t)
+ {
+ // Ignore - the server might have already closed - so this is ok
+ }
+
+ connection = null;
+
+ started = false;
+ }
+ }
- protected synchronized boolean hasSucker(String queueName)
- {
- return suckers.containsKey(queueName);
- }
+ protected boolean hasSucker(String queueName)
+ {
+ synchronized (suckers)
+ {
+ return suckers.containsKey(queueName);
+ }
+ }
- protected synchronized void addSucker(MessageSucker sucker)
- {
- if (suckers.containsKey(sucker.getQueueName()))
- {
- throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
- }
-
- suckers.put(sucker.getQueueName(), sucker);
- }
+ protected void addSucker(MessageSucker sucker)
+ {
+ synchronized (suckers)
+ {
+ if (suckers.containsKey(sucker.getQueueName()))
+ {
+ throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
+ }
+
+ suckers.put(sucker.getQueueName(), sucker);
+ }
+ }
- synchronized MessageSucker removeSucker(String queueName)
- {
- MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
-
- return sucker;
- }
+ MessageSucker removeSucker(String queueName)
+ {
+ synchronized (suckers)
+ {
+ MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
+ return sucker;
+ }
+ }
+
//https://jira.jboss.org/jira/browse/JBMESSAGING-1732
//on exception, try to recreate all suckers.
public void onException(JMSException e)
@@ -805,15 +828,21 @@
//first stop all the suckers
//then try to close the connection
- protected synchronized void cleanupConnection()
+ protected void cleanupConnection()
{
if (!started)
{
return;
}
- Iterator iter = suckers.values().iterator();
+ Map suckersCopy = null;
+ synchronized (suckers)
+ {
+ suckersCopy = new HashMap(suckers);
+ }
+ Iterator iter = suckersCopy.values().iterator();
+
while (iter.hasNext())
{
MessageSucker sucker = (MessageSucker)iter.next();
@@ -821,11 +850,13 @@
sucker.suspend();
}
+ final JBossConnection copy = connection;
+
Callable callable = new Callable() { public Object call()
{
try
{
- connection.close();
+ copy.close();
}
catch (JMSException ignore)
{
@@ -844,16 +875,27 @@
//Ignore - the server might have already closed - so this is ok
}
- connection = null;
-
- started = false;
+ synchronized (this)
+ {
+ connection = null;
+
+ started = false;
+ }
}
+ protected int getSuckerSize()
+ {
+ synchronized(suckers)
+ {
+ return suckers.size();
+ }
+ }
+
protected synchronized int retryConnection()
{
int retryCount = 0;
- while (((maxRetry == -1) || (retryCount < maxRetry)) && (suckers.size() > 0))
+ while (((maxRetry == -1) || (retryCount < maxRetry)) && (getSuckerSize() > 0))
{
try
{
@@ -884,20 +926,23 @@
}
//now resume the suckers
- Iterator iter = suckers.values().iterator();
-
- while (iter.hasNext())
+ synchronized (suckers)
{
- MessageSucker sucker = (MessageSucker)iter.next();
-
- try
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
{
- sucker.resume(session);
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ try
+ {
+ sucker.resume(session);
+ }
+ catch (JMSException e)
+ {
+ log.warn("Error resuming sucker " + sucker, e);
+ }
}
- catch (JMSException e)
- {
- log.warn("Error resuming sucker " + sucker, e);
- }
}
return retryCount;
More information about the jboss-cvs-commits
mailing list