[jboss-cvs] JBoss Messaging SVN: r8255 - in branches/Branch_1_4/src/main/org/jboss/jms/client: container and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 6 10:48:24 EDT 2011


Author: gaohoward
Date: 2011-04-06 10:48:23 -0400 (Wed, 06 Apr 2011)
New Revision: 8255

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
Log:
JBMESSAGING-1855


Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -204,6 +204,8 @@
          else
          {
             log.debug(this + " aborted failover");
+            state.beforeAborting();
+            
             ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
             connDelegate.closing(-1);
             connDelegate.close();

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -298,6 +298,7 @@
    private ExecutorService pool = Executors.newFixedThreadPool(1);
    private long maxRetryChangeRate;
    private long retryChangeRateInterval;
+   private boolean abortReceive;
 
    public int getBufferSize()
    {
@@ -969,11 +970,16 @@
             if (timeout == 0)
             {
                // wait for ever potentially
-               while (!closed && buffer.isEmpty())
+               while (!closed && buffer.isEmpty() && (!abortReceive))
                {
                   if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
 
                   mainLock.wait();
+                  
+                  if (buffer.isEmpty() && abortReceive)
+                  {
+                     break;
+                  }
 
                   if (trace) { log.trace(this + " done waiting on main lock"); }
                }
@@ -1183,7 +1189,19 @@
                   
          if (trace) { log.trace("Exiting run()"); }
       }
-   }         
+   }
+
+   //aborting receive if it is blocking.
+   //
+   public void abortReceive()
+   {
+      synchronized (mainLock)
+      {
+         log.error(this + " The connection is about to be aborted, stop the blocking receiving.");
+         abortReceive = true;
+         mainLock.notifyAll();
+      }
+   }
 }
 
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -68,6 +68,9 @@
             
       // forward the exception to delegate listener and JMS ExceptionListeners;
       boolean forwardToJMSListener = true;
+      
+      //save a copy. This is because if the failover failed the listener will be cleaned up.
+      ExceptionListener jmsListenerCopy = jmsExceptionListener;
 
       if (remotingListener != null)
       {
@@ -112,9 +115,9 @@
                jmsException = new JMSException(msg + ": " + throwable.getMessage());
             }
             
-            if (jmsExceptionListener != null)
+            if (jmsListenerCopy != null)
             {
-               jmsExceptionListener.onException(jmsException);
+               jmsListenerCopy.onException(jmsException);
                jmsException = null;
             }
          }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -560,7 +560,10 @@
          return null;
       }
 
-      client.removeConnectionListener(remotingConnectionListener);
+      if (client != null)
+      {
+         client.removeConnectionListener(remotingConnectionListener);
+      }
 
       log.trace(this + " removed consolidated connection listener from " + client);
       ConsolidatedRemotingConnectionListener toReturn = remotingConnectionListener;

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -23,6 +23,7 @@
 
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 
 import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
@@ -325,6 +326,18 @@
       return retryChangeRateInterval;
    }
 
+   //aborting when failover failed.
+   public void beforeAborting()
+   {
+      Iterator sessStates = this.getChildren().iterator();
+      while (sessStates.hasNext())
+      {
+         SessionState sstate = (SessionState)sessStates.next();
+         sstate.beforeAborting();
+      }
+      
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -235,6 +235,11 @@
    {
       return this.retryChangeRateInterval;
    }
+   
+   public void beforeAborting()
+   {
+      clientConsumer.abortReceive();
+   }
 
    // Package protected ----------------------------------------------------------------------------
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/HierarchicalStateSupport.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -73,6 +73,10 @@
    {
       return children;
    }
+   
+   public void beforeAborting()
+   {
+   }
 
    // Public ---------------------------------------------------------------------------------------
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-04-06 14:38:13 UTC (rev 8254)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-04-06 14:48:23 UTC (rev 8255)
@@ -579,6 +579,16 @@
    {
       return parent.getRetryChangeRateInterval();
    }
+   
+   public void beforeAborting()
+   {
+      Iterator consumers = this.getChildren().iterator();
+      while (consumers.hasNext())
+      {
+         ConsumerState conState = (ConsumerState)consumers.next();
+         conState.beforeAborting();
+      }
+   }
 
 }
 



More information about the jboss-cvs-commits mailing list