[jboss-cvs] JBoss Messaging SVN: r8255 - in branches/Branch_1_4/src/main/org/jboss/jms/client: container and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 6 10:48:24 EDT 2011
Author: gaohoward
Date: 2011-04-06 10:48:23 -0400 (Wed, 06 Apr 2011)
New Revision: 8255
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
Log:
JBMESSAGING-1855
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -204,6 +204,8 @@
else
{
log.debug(this + " aborted failover");
+ state.beforeAborting();
+
ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
connDelegate.closing(-1);
connDelegate.close();
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -298,6 +298,7 @@
private ExecutorService pool = Executors.newFixedThreadPool(1);
private long maxRetryChangeRate;
private long retryChangeRateInterval;
+ private boolean abortReceive;
public int getBufferSize()
{
@@ -969,11 +970,16 @@
if (timeout == 0)
{
// wait for ever potentially
- while (!closed && buffer.isEmpty())
+ while (!closed && buffer.isEmpty() && (!abortReceive))
{
if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
mainLock.wait();
+
+ if (buffer.isEmpty() && abortReceive)
+ {
+ break;
+ }
if (trace) { log.trace(this + " done waiting on main lock"); }
}
@@ -1183,7 +1189,19 @@
if (trace) { log.trace("Exiting run()"); }
}
- }
+ }
+
+ //aborting receive if it is blocking.
+ //
+ public void abortReceive()
+ {
+ synchronized (mainLock)
+ {
+ log.error(this + " The connection is about to be aborted, stop the blocking receiving.");
+ abortReceive = true;
+ mainLock.notifyAll();
+ }
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -68,6 +68,9 @@
// forward the exception to delegate listener and JMS ExceptionListeners;
boolean forwardToJMSListener = true;
+
+ //save a copy. This is because if the failover failed the listener will be cleaned up.
+ ExceptionListener jmsListenerCopy = jmsExceptionListener;
if (remotingListener != null)
{
@@ -112,9 +115,9 @@
jmsException = new JMSException(msg + ": " + throwable.getMessage());
}
- if (jmsExceptionListener != null)
+ if (jmsListenerCopy != null)
{
- jmsExceptionListener.onException(jmsException);
+ jmsListenerCopy.onException(jmsException);
jmsException = null;
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -560,7 +560,10 @@
return null;
}
- client.removeConnectionListener(remotingConnectionListener);
+ if (client != null)
+ {
+ client.removeConnectionListener(remotingConnectionListener);
+ }
log.trace(this + " removed consolidated connection listener from " + client);
ConsolidatedRemotingConnectionListener toReturn = remotingConnectionListener;
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
@@ -325,6 +326,18 @@
return retryChangeRateInterval;
}
+ //aborting when failover failed.
+ public void beforeAborting()
+ {
+ Iterator sessStates = this.getChildren().iterator();
+ while (sessStates.hasNext())
+ {
+ SessionState sstate = (SessionState)sessStates.next();
+ sstate.beforeAborting();
+ }
+
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -235,6 +235,11 @@
{
return this.retryChangeRateInterval;
}
+
+ public void beforeAborting()
+ {
+ clientConsumer.abortReceive();
+ }
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -73,6 +73,10 @@
{
return children;
}
+
+ public void beforeAborting()
+ {
+ }
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-04-06 14:48:23 UTC (rev 8255)
@@ -579,6 +579,16 @@
{
return parent.getRetryChangeRateInterval();
}
+
+ public void beforeAborting()
+ {
+ Iterator consumers = this.getChildren().iterator();
+ while (consumers.hasNext())
+ {
+ ConsumerState conState = (ConsumerState)consumers.next();
+ conState.beforeAborting();
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list