[jboss-cvs] JBoss Messaging SVN: r2550 - in trunk/src/main/org/jboss/jms: client/container and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 15 19:18:19 EDT 2007


Author: timfox
Date: 2007-03-15 19:18:18 -0400 (Thu, 15 Mar 2007)
New Revision: 2550

Modified:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
   trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
   trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
Log:
Partial fix for http://jira.jboss.org/jira/browse/JBMESSAGING-886 and fix for http://jira.jboss.org/jira/browse/JBMESSAGING-920



Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -63,9 +63,12 @@
    /**
     * Method called by failure detection components (FailoverValveInterceptors and
     * ConnectionListeners) when they have reasons to believe that a server failure occured.
+    * 
+    * Returns true if the failover command centre handled the exception gracefully and failover completed
+    * or false if it didn't and failover did not occur
     */
-   public void failureDetected(Throwable reason, FailureDetector source,
-                               JMSRemotingConnection remotingConnection)
+   public boolean failureDetected(Throwable reason, FailureDetector source,
+                                  JMSRemotingConnection remotingConnection)
       throws Exception
    {
       log.debug("failure detected by " + source);
@@ -94,7 +97,9 @@
             {
                log.debug(this + " ignoring failure detection notification, as failover was " +
                   "already (or is in process of being) performed on this connection");
-               return;
+               
+               //Return true since failover already completed ok
+               return true;
             }
 
             remotingConnection.setFailed();
@@ -118,9 +123,7 @@
          
          if (res == null)
          {
-            // No failover attempt was detected on the server side; this might happen if the
-            // client side network fails temporarily so the client connection breaks but the
-            // server cluster is still up and running - in this case we don't perform failover.
+            // Failover did not occur
             failoverSuccessful = false;
          }
          else
@@ -144,6 +147,8 @@
             
             failoverSuccessful = true;                        
          }
+         
+         return failoverSuccessful;
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -99,7 +99,7 @@
 
       while (attemptCount < MAX_RECONNECT_HOP_COUNT)
       {
-         // since an exceptiong might be captured during an attempt, this has to be the first
+         // since an exception might be captured during an attempt, this has to be the first
          // operation
          attemptCount++;
          try
@@ -142,7 +142,7 @@
                // add a connection listener to detect failure; the consolidated remoting connection
                // listener must be already in place and configured
                state.getRemotingConnection().getConnectionListener().
-                  addDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
+                  setDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
 
                log.debug(this + " installed failure listener on " + cd);
 

Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -6,23 +6,23 @@
  */
 package org.jboss.jms.client.container;
 
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
 import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.FailureDetector;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
 
 /**
  * The listener that detects a connection failure and initiates the failover process. Each physical
  * connection created under the supervision of ClusteredAspect has one of these.
  *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
  */
-public class ConnectionFailureListener implements ConnectionListener, FailureDetector
+public class ConnectionFailureListener implements FailureDetector
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -48,18 +48,24 @@
 
    // ConnectionListener implementation ------------------------------------------------------------
 
-   public void handleConnectionException(Throwable throwable, Client client)
+   /*
+    * Returns true if failover handled the exception gracefully
+    * Returns false if failover was unable to handle the exception and it should be passed
+    * on to any JMS exception listener
+    */
+   public boolean handleConnectionException(Throwable throwable, Client client)
    {
       try
       {
          log.debug(this + " is being notified of connection failure: " + throwable);
 
-         fcc.failureDetected(throwable, this, remotingConnection);
-
+         return fcc.failureDetected(throwable, this, remotingConnection);
       }
       catch (Throwable e)
       {
          log.error("Caught exception in handling failure", e);
+         
+         return false;
       }
    }
 

Modified: trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -6,17 +6,15 @@
  */
 package org.jboss.jms.client.remoting;
 
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
-import org.jboss.jms.client.state.ConnectionState;
-
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
 
+import org.jboss.jms.client.container.ConnectionFailureListener;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.ConnectionListener;
+
 /**
  * The ONLY remoting connection listener for a JMS connection's underlying remoting connection.
  * Added to the remoting connection when the JMS connection is created, and removed when the
@@ -44,51 +42,56 @@
    private ExceptionListener jmsExceptionListener;
 
    // List<ConnectionListener>
-   private List delegateListeners;
+   //private List delegateListeners;
+   
+   private ConnectionFailureListener remotingListener;
 
    // Constructors ---------------------------------------------------------------------------------
 
    public ConsolidatedRemotingConnectionListener()
    {
-      delegateListeners = new ArrayList();
    }
 
    // ConnectionListener implementation ------------------------------------------------------------
 
    public void handleConnectionException(Throwable throwable, Client client)
    {
-      // forward the exception to delegate listeners and JMS ExceptionListeners; synchronize and
-      // copy to avoid race conditions
+      // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
+      // to avoid race conditions
 
       ExceptionListener jmsExceptionListenerCopy;
-      List delegateListenersCopy = new ArrayList();
+  
+      ConnectionFailureListener remotingListenerCopy;
 
       synchronized(this)
       {
          jmsExceptionListenerCopy = jmsExceptionListener;
 
-         for(Iterator i = delegateListeners.iterator(); i.hasNext(); )
-         {
-            delegateListenersCopy.add(i.next());
-         }
+         remotingListenerCopy = remotingListener;
       }
+      
+      boolean forwardToJMSListener = true;
 
-      for(Iterator i = delegateListenersCopy.iterator(); i.hasNext(); )
+      if (remotingListenerCopy != null)
       {
-         ConnectionListener l = (ConnectionListener)i.next();
-
          try
          {
-            log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + l);
-            l.handleConnectionException(throwable, client);
+            log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + remotingListenerCopy);
+            
+            //We only forward to the JMS listener if failover did not successfully handle the exception
+            //If failover handled the exception transparently then there is effectively no problem
+            //with the logical connection that the client needs to be aware of
+            forwardToJMSListener = !remotingListenerCopy.handleConnectionException(throwable, client);
          }
          catch(Exception e)
          {
-            log.warn("Failed to forward " + throwable + " to " + l, e);
+            log.warn("Failed to forward " + throwable + " to " + remotingListenerCopy, e);
          }
       }
-
-      if (jmsExceptionListenerCopy != null)
+      
+      log.info("DISPATCHING TO JMSLISTENER " + forwardToJMSListener);
+      
+      if (forwardToJMSListener && jmsExceptionListenerCopy != null)
       {
          JMSException jmsException = null;
 
@@ -118,10 +121,16 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   public synchronized boolean addDelegateListener(ConnectionListener l)
+   public synchronized void setDelegateListener(ConnectionFailureListener l)
    {
-      log.debug(this + " adding delegate listener " + l);
-      return delegateListeners.add(l);
+      log.debug(this + " setting delegate listener " + l);
+      
+      if (remotingListener != null)
+      {
+         throw new IllegalStateException("There is already a connection listener for the connection");
+      }
+      
+      remotingListener = l;
    }
 
    public synchronized void addJMSExceptionListener(ExceptionListener jmsExceptionListener)
@@ -141,7 +150,7 @@
    public synchronized void clear()
    {
       jmsExceptionListener = null;
-      delegateListeners.clear();
+      remotingListener = null;
       log.debug(this + " cleared");
    }
 

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -343,6 +343,11 @@
       {
          // very unlikely to get an exception on a local remove (I suspect badly designed API),
          // but we're failed anyway, so we don't care too much
+         
+         // Actually an exception will always be thrown here if the failure was detected by the connection
+         // validator since the validator will disconnect the client before calling the connection
+         // listener.
+
          log.debug(this + " failed to cleanly remove callback manager from the client", t);
       }
 

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -72,7 +72,6 @@
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.Util;
 import org.jboss.mx.loading.UnifiedClassLoader3;
-import org.jboss.remoting.ServerInvocationHandler;
 import org.jboss.remoting.marshal.MarshalFactory;
 import org.jboss.system.ServiceCreator;
 import org.jboss.system.ServiceMBeanSupport;
@@ -169,10 +168,6 @@
    protected ObjectName defaultExpiryQueueObjectName;
    protected Queue defaultExpiryQueue;
 
-   //Other stuff
-
-   private JMSServerInvocationHandler handler;
-
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerPeer(int serverPeerID,
@@ -272,6 +267,10 @@
          txRepository.loadPreparedTransactions();
          
          initializeRemoting(mbeanServer);
+         
+         //Now everything is started we can tell the invocation handler to start handling invocations
+         //We do this right at the end otherwise it can start handling invocations before we are properly started
+         JMSServerInvocationHandler.setClosed(false);
 
          started = true;
 
@@ -296,6 +295,11 @@
          log.debug(this + " stopping");
 
          started = false;
+         
+         //Tell the invocation handler we are closed - this is so we don't attempt to handle
+         //any invocations when we are in a partial closing down state - which can give strange
+         //"object not found with id" exceptions and stuff like that
+         JMSServerInvocationHandler.setClosed(true);
 
          // Stop the wired components
 
@@ -927,11 +931,6 @@
    {
       return channelIDManager;
    }
-
-   public ServerInvocationHandler getInvocationHandler()
-   {
-      return handler;
-   }
    
    public ServerSessionEndpoint getSession(Integer sessionID)
    {
@@ -1277,8 +1276,6 @@
       JMSWireFormat wf = new JMSWireFormat();
 
       MarshalFactory.addMarshaller("jms", wf, wf);
-
-      handler = new JMSServerInvocationHandler();
    }
 
    private void loadServerAOPConfig() throws Exception

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -198,18 +198,8 @@
    {
       try
       {
-//         if (!connectionEndpoint.isFailoverConnection())
-//         {
-            // regular consumer
-            return createConsumerDelegateInternal(jmsDestination, selector,
-                                                  noLocal, subscriptionName);
-  //       }
-
-//         // we're child of a failover connection. Favor failover channels when creating new
-//         // consumers
-//         return createFailoverConsumerDelegateInternal(jmsDestination, selector,
-//                                                       noLocal, subscriptionName,
-//                                                       failoverChannelID);
+         return createConsumerDelegateInternal(jmsDestination, selector,
+                                               noLocal, subscriptionName);
       }
       catch (Throwable t)
       {
@@ -223,15 +213,7 @@
 	{
       try
       {
-//         if (!connectionEndpoint.isFailoverConnection())
-//         {
-            // regular browser
-            return createBrowserDelegateInternal(jmsDestination, selector);
-//         }
-//
-//         // we're child of a failover connection. Favor failover channels when creating new
-//         // browsers
-//         return createFailoverBrowserDelegateInternal(jmsDestination, selector, failoverChannelID);
+         return createBrowserDelegateInternal(jmsDestination, selector);
       }
       catch (Throwable t)
       {
@@ -850,6 +832,9 @@
       
       promptDelivery(channels);
       
+      //Close down the executor
+      executor.shutdownAfterProcessingCurrentTask();
+      
       deliveries.clear();
       
       sp.removeSession(new Integer(id));

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-03-15 22:34:09 UTC (rev 2549)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-03-15 23:18:18 UTC (rev 2550)
@@ -28,6 +28,7 @@
 
 import javax.management.MBeanServer;
 
+import org.jboss.jms.util.MessagingJMSException;
 import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
 import org.jboss.jms.wireformat.RequestSupport;
 import org.jboss.logging.Logger;
@@ -62,14 +63,22 @@
    
    private boolean trace;
    
+   //We need some way the server peer can call the invocation handler to make it open/closed
+   private static boolean closed = true;
+   
+   public static synchronized void setClosed(boolean closed)
+   {
+      JMSServerInvocationHandler.closed = closed;
+   }
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public JMSServerInvocationHandler()
    {
       callbackHandlers = new HashMap();
       trace = log.isTraceEnabled();
-   }
-
+   }   
+     
    // ServerInvocationHandler ----------------------------------------------------------------------
 
    public void setMBeanServer(MBeanServer server)
@@ -92,37 +101,45 @@
    public Object invoke(InvocationRequest invocation) throws Throwable
    {      
       if (trace) { log.trace("invoking " + invocation); }
-        
-      RequestSupport request = (RequestSupport)invocation.getParameter();
       
-      if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
-      {
-         //Create connection request
-         
-         ConnectionFactoryCreateConnectionDelegateRequest cReq = 
-            (ConnectionFactoryCreateConnectionDelegateRequest)request;
-         
-         String remotingSessionId = cReq.getRemotingSessionID();
-         
-         ServerInvokerCallbackHandler callbackHandler = null;
-         synchronized(callbackHandlers)
+      synchronized (JMSServerInvocationHandler.class)
+      {         
+         if (closed)
          {
-            callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+            throw new MessagingJMSException("Cannot handle invocation since server is not active (it is either starting up or shutting down)");
          }
-         if (callbackHandler != null)
+           
+         RequestSupport request = (RequestSupport)invocation.getParameter();
+         
+         if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
          {
-            log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+            //Create connection request
             
-            cReq.setCallbackHandler(callbackHandler);
+            ConnectionFactoryCreateConnectionDelegateRequest cReq = 
+               (ConnectionFactoryCreateConnectionDelegateRequest)request;
+            
+            String remotingSessionId = cReq.getRemotingSessionID();
+            
+            ServerInvokerCallbackHandler callbackHandler = null;
+            synchronized(callbackHandlers)
+            {
+               callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+            }
+            if (callbackHandler != null)
+            {
+               log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+               
+               cReq.setCallbackHandler(callbackHandler);
+            }
+            else
+            {
+               throw new IllegalStateException("Cannot find callback handler " +
+                                               "for session id " + remotingSessionId);
+            }
          }
-         else
-         {
-            throw new IllegalStateException("Cannot find callback handler " +
-                                            "for session id " + remotingSessionId);
-         }
+      
+         return request.serverInvoke();
       }
-      
-      return request.serverInvoke();
    }
 
    public void addListener(InvokerCallbackHandler callbackHandler)
@@ -193,5 +210,10 @@
    
    // Private --------------------------------------------------------------------------------------
    
+   private synchronized void doSetClosed(boolean closed)
+   {
+      this.closed = true;
+   }
+   
    // Inner classes --------------------------------------------------------------------------------
 }




More information about the jboss-cvs-commits mailing list