[jboss-cvs] JBoss Messaging SVN: r1790 - in trunk/src/main/org/jboss/jms/client: container delegate remoting state

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 14 03:58:45 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-14 03:58:42 -0500 (Thu, 14 Dec 2006)
New Revision: 1790

Added:
   trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
Log:
Fixed bug: HAAspect used to add a remoting connection failure listener without 
actually realizing that a connection listener might have already been installed
by the ConnectionAspect (the connection listener is the ConnectionAspect instance
itself). More than that, on connection closing it did not cleanly remove the failure 
listener, leading to all kind of undeterministic behavior on failover.


Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -23,17 +23,17 @@
 
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.JBossConnectionMetaData;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
-import org.jboss.remoting.ConnectionListener;
 
 /**
  * Handles operations related to the connection
@@ -47,22 +47,23 @@
  *
  * $Id$
  */
-public class ConnectionAspect implements ConnectionListener
+public class ConnectionAspect
 {
    // Constants -----------------------------------------------------
-   
-   // Static --------------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(ConnectionAspect.class);
-   private static boolean trace = log.isTraceEnabled();
    
+   // Static --------------------------------------------------------
+
    // Attributes ----------------------------------------------------
-   
-   
+
    protected JBossConnectionMetaData connMetaData;
    
    protected ConnectionState state;
-   
+
+   // The identity of the delegate this interceptor is associated with
+   private Integer id;
+
    // Constructors --------------------------------------------------
    
    // Public --------------------------------------------------------
@@ -108,40 +109,21 @@
    
    public Object handleGetExceptionListener(Invocation invocation) throws Throwable
    {
-      ConnectionState currentState = getConnectionState(invocation);
-      currentState.setJustCreated(false);
+      ConnectionState state = getConnectionState(invocation);
+      state.setJustCreated(false);
       
-      return currentState.getExceptionListener();
+      return state.getRemotingConnectionListener().getJMSExceptionListener();
    }
    
    public Object handleSetExceptionListener(Invocation invocation) throws Throwable
    {
-      ConnectionState currentState = getConnectionState(invocation);
-      currentState.setJustCreated(false);
+      ConnectionState state = getConnectionState(invocation);
+      state.setJustCreated(false);
       
       MethodInvocation mi = (MethodInvocation)invocation;
-      
-      currentState.setExceptionListener((ExceptionListener)mi.getArguments()[0]);
-      
-      Client client = getConnectionState(invocation).getRemotingConnection().getInvokingClient();
-      
-      if (client == null)
-      {
-         throw new java.lang.IllegalStateException("Cannot find remoting client");
-      }
-      
-      if (currentState.getExceptionListener() != null)
-      {
-         client.addConnectionListener(this);
-         currentState.setListenerAdded(true);
-      }
-      else
-      {
-         if (currentState.isListenerAdded())
-         {
-            client.removeConnectionListener(this);
-         }
-      }
+      ExceptionListener exceptionListener = (ExceptionListener)mi.getArguments()[0];
+      state.getRemotingConnectionListener().addJMSExceptionListener(exceptionListener);
+
       return null;
    }
    
@@ -184,63 +166,44 @@
    {
       Object ret = invocation.invokeNext();
       
-      // Remove any exception listener
-      ConnectionState currentState = getConnectionState(invocation);
-      
-      Client client = getConnectionState(invocation).getRemotingConnection().getInvokingClient();
-      
-      if (currentState.isListenerAdded())
-      {
-         client.removeConnectionListener(this);
-      }
-            
       ConnectionState state = getConnectionState(invocation);
-      
+
+      JMSRemotingConnection remotingConnection = state.getRemotingConnection();
+
+      // remove the consolidated remoting connection listener
+      ConsolidatedRemotingConnectionListener listener = state.getRemotingConnectionListener();
+      listener.clear();
+
+      Client client = remotingConnection.getInvokingClient();
+      boolean removed = client.removeConnectionListener(listener);
+
+      log.debug(this + (removed ? " removed " : " failed to remove ") +
+                "the consolidated remoting connection listener from " + client);
+
       // Finished with the connection - we need to shutdown callback server
-      state.getRemotingConnection().stop();
+      remotingConnection.stop();
        
-      // Remove reference to message id generator
+      // Remove reference to message ID generator
       MessageIdGeneratorFactory.instance.checkInGenerator(state.getServerID());
-      
+
       return ret;
    }
-   
-   
-   // ConnectionListener implementation -----------------------------------------------------------
-   
-   public void handleConnectionException(Throwable t, Client c)
+
+   public String toString()
    {
-      log.error("Caught exception from connection", t);
-      
-      if (state.getExceptionListener()!= null)
+      StringBuffer sb = new StringBuffer("ConnectionAspect[");
+
+      if (id == null)
       {
-         JMSException j = null;
-         if (t instanceof Error)
-         {
-            final String msg = "Caught Error on underlying connection";
-            log.error(msg, t);
-            j = new JMSException(msg + ": " + t.getMessage());
-         }
-         else if (t instanceof Exception)
-         {
-            Exception e =(Exception)t;
-            j = new JMSException("Throwable received from underlying connection");
-            j.setLinkedException(e);
-         }
-         else
-         {
-            //Some other Throwable subclass
-            final String msg = "Caught Throwable on underlying connection";
-            log.error(msg, t);
-            j = new JMSException(msg + ": " + t.getMessage());
-         }
-         synchronized (state.getExceptionListener())
-         {
-            state.getExceptionListener().onException(j);
-         }
+         sb.append("UNINITIALIZED]");
       }
+      else
+      {
+         sb.append(id).append("]");
+      }
+      return sb.toString();
    }
-   
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -249,12 +212,14 @@
    
    private ConnectionState getConnectionState(Invocation invocation)
    {
-      if (state==null)
+      if (state == null)
       {
          ClientConnectionDelegate currentDelegate =
             ((ClientConnectionDelegate)invocation.getTargetObject());
          
          state = (ConnectionState)currentDelegate.getState();
+         id = new Integer(state.getDelegate().getID());
+
       }
       return state;
    }

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -147,13 +147,14 @@
 
       if(trace) { log.trace(this + " got local connection delegate " + cd); }
 
-      // Add a connection listener to detect failure
+      // Add a connection listener to detect failure; the consolidated remoting connection listener
+      // must be already in place and configured
 
       ConnectionListener listener = new ConnectionFailureListener(cd);
-      ConnectionState state = (ConnectionState)((DelegateSupport)cd).getState();
-      if(trace) { log.trace(this + " registering " + listener + " on " + cd); }
-      state.getRemotingConnection().getInvokingClient().addConnectionListener(listener);
 
+      ((ConnectionState)((DelegateSupport)cd).getState()).
+         getRemotingConnectionListener().addDelegateListener(listener);
+
       return new CreateConnectionResult(cd);
    }
 

Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -30,6 +30,7 @@
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.state.BrowserState;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.ConsumerState;
@@ -98,18 +99,26 @@
 
          int serverID = connectionDelegate.getServerID();
          Version versionToUse = connectionDelegate.getVersionToUse();
-         JMSRemotingConnection remotingConn = connectionDelegate.getRemotingConnection();
+         JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
 
+         // install the consolidated remoting connection listener; it will be de-installed on
+         // connection closing by ConnectionAspect
+
+         ConsolidatedRemotingConnectionListener listener = new ConsolidatedRemotingConnectionListener();
+         remotingConnection.getInvokingClient().addConnectionListener(listener);
+
          if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
          }
 
          // We have one message id generator per unique server
-         MessageIdGenerator g = MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
+         MessageIdGenerator idGenerator =
+            MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
 
          ConnectionState connectionState =
-            new ConnectionState(serverID, connectionDelegate, remotingConn, versionToUse, g);
+            new ConnectionState(serverID, connectionDelegate, remotingConnection,
+                                listener, versionToUse, idGenerator);
 
          connectionDelegate.setState(connectionState);
 

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -294,7 +294,7 @@
 
    public String toString()
    {
-      return "ClientConnectionFactoryDelegate[" + id + ", " + serverID + "]";
+      return "ConnectionFactoryDelegate[" + id + ", " + serverID + "]";
    }
    
    public String getServerLocatorURI()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -119,7 +119,7 @@
 
    public String toString()
    {
-      StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[");
+      StringBuffer sb = new StringBuffer("ClusteredConnectionFactoryDelegate[");
       sb.append(id).append("][");
       if (delegates == null)
       {

Added: trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -0,0 +1,170 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client.remoting;
+
+import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.Client;
+import org.jboss.logging.Logger;
+import org.jboss.jms.client.state.ConnectionState;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * The ONLY remoting connection listener for a JMS connection's underlying remoting connection.
+ * Added to the remoting connection when the JMS connection is created, and removed when the
+ * JMS connection is closed. Any second tier listeners (the JMS connection ExceptionListener,
+ * and the HA's connection failure detector) are registered with this consolidated listener and not
+ * with the remoting connection directly.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConsolidatedRemotingConnectionListener implements ConnectionListener
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(ConsolidatedRemotingConnectionListener.class);
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private ConnectionState state;
+
+   private ExceptionListener jmsExceptionListener;
+
+   // List<ConnectionListener>
+   private List delegateListeners;
+
+   // Constructors --------------------------------------------------
+
+   public ConsolidatedRemotingConnectionListener()
+   {
+      delegateListeners = new ArrayList();
+   }
+
+   // ConnectionListener implementation -----------------------------
+
+   public void handleConnectionException(Throwable throwable, Client client)
+   {
+      // forward the exception to delegate listeners and JMS ExceptionListeners; synchronize and
+      // copy to avoid race conditions
+
+      ExceptionListener jmsExceptionListenerCopy;
+      List delegateListenersCopy = new ArrayList();
+
+      synchronized(this)
+      {
+         jmsExceptionListenerCopy = jmsExceptionListener;
+
+         for(Iterator i = delegateListeners.iterator(); i.hasNext(); )
+         {
+            delegateListenersCopy.add(i.next());
+         }
+      }
+
+      for(Iterator i = delegateListenersCopy.iterator(); i.hasNext(); )
+      {
+         ConnectionListener l = (ConnectionListener)i.next();
+
+         try
+         {
+            log.debug(this + " forwarding remoting failure " + throwable + " to " + l);
+            l.handleConnectionException(throwable, client);
+         }
+         catch(Exception e)
+         {
+            log.warn("Failed to forward " + throwable + " to " + l, e);
+         }
+      }
+
+      if (jmsExceptionListenerCopy != null)
+      {
+         JMSException jmsException = null;
+
+         if (throwable instanceof Error)
+         {
+            final String msg = "Caught Error on underlying remoting connection";
+            log.error(this + ": " + msg, throwable);
+            jmsException = new JMSException(msg + ": " + throwable.getMessage());
+         }
+         else if (throwable instanceof Exception)
+         {
+            Exception e = (Exception)throwable;
+            jmsException = new JMSException("Failure on underlying remoting connection");
+            jmsException.setLinkedException(e);
+         }
+         else
+         {
+            // Some other Throwable subclass
+            final String msg = "Caught Throwable on underlying remoting connection";
+            log.error(this + ": " + msg, throwable);
+            jmsException = new JMSException(msg + ": " + throwable.getMessage());
+         }
+
+         jmsExceptionListenerCopy.onException(jmsException);
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   public synchronized boolean addDelegateListener(ConnectionListener l)
+   {
+      log.debug(this + " adding delegate listener " + l);
+      return delegateListeners.add(l);
+   }
+
+   public synchronized void addJMSExceptionListener(ExceptionListener jmsExceptionListener)
+   {
+      log.debug(this + " adding JMS exception listener " + jmsExceptionListener);
+      this.jmsExceptionListener = jmsExceptionListener;
+   }
+
+   public synchronized ExceptionListener getJMSExceptionListener()
+   {
+      return jmsExceptionListener;
+   }
+
+   /**
+    * Clears all delegate listeners
+    */
+   public synchronized void clear()
+   {
+      jmsExceptionListener = null;
+      delegateListeners.clear();
+      log.debug(this + " cleared");
+   }
+
+   public void setConnectionState(ConnectionState state)
+   {
+      this.state = state;
+   }
+
+   public String toString()
+   {
+      if (state == null)
+      {
+         return "ConsolidatedRemotingConnectionListener(UNINITIALIZED)";
+      }
+      return state + ".ConsolidatedRemotingConnectionListener";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-14 06:47:18 UTC (rev 1789)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-14 08:58:42 UTC (rev 1790)
@@ -23,11 +23,10 @@
 
 import java.util.HashSet;
 
-import javax.jms.ExceptionListener;
-
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.message.MessageIdGenerator;
 import org.jboss.jms.server.Version;
@@ -53,6 +52,8 @@
 
    private JMSRemotingConnection remotingConnection;
 
+   private ConsolidatedRemotingConnectionListener remotingConnectionListener;
+
    private ResourceManager resourceManager;
 
    private MessageIdGenerator idGenerator;
@@ -75,16 +76,12 @@
    private String clientID;
 
     /** This property used to be delcared on ConnectionAspect */
-   private ExceptionListener exceptionListener;
-
-    /** This property used to be delcared on ConnectionAspect */
    private boolean justCreated = true;
 
-    /** This property used to be delcared on ConnectionAspect */
-   private boolean listenerAdded;
-   
    public ConnectionState(int serverID, ConnectionDelegate delegate,
-                          JMSRemotingConnection remotingConnection, Version versionToUse,
+                          JMSRemotingConnection remotingConnection,
+                          ConsolidatedRemotingConnectionListener remotingConnectionListener,
+                          Version versionToUse,
                           MessageIdGenerator gen)
       throws Exception
    {
@@ -97,6 +94,9 @@
       this.remotingConnection = remotingConnection;
       this.versionToUse = versionToUse;
 
+      this.remotingConnectionListener = remotingConnectionListener;
+      remotingConnectionListener.setConnectionState(this);
+
       // Each connection has its own resource manager. If we can failover all connections with the
       // same server id at the same time then we can maintain one rm per unique server as opposed to
       // per connection.
@@ -121,9 +121,9 @@
       return remotingConnection;
    }
 
-   public void setRemotingConnection(JMSRemotingConnection remotingConnection)
+   public ConsolidatedRemotingConnectionListener getRemotingConnectionListener()
    {
-       this.remotingConnection=remotingConnection;
+      return remotingConnectionListener;
    }
 
    public Version getVersionToUse()
@@ -193,16 +193,6 @@
       this.clientID = clientID;
    }
 
-   public ExceptionListener getExceptionListener()
-   {
-      return exceptionListener;
-   }
-
-   public void setExceptionListener(ExceptionListener exceptionListener)
-   {
-      this.exceptionListener = exceptionListener;
-   }
-
    public boolean isJustCreated()
    {
       return justCreated;
@@ -213,17 +203,9 @@
       this.justCreated = justCreated;
    }
 
-   public boolean isListenerAdded()
-   {
-      return listenerAdded;
-   }
-
-   public void setListenerAdded(boolean listenerAdded)
-   {
-      this.listenerAdded = listenerAdded;
-   }
-
-   /** Connection doesn't have a parent */
+   /**
+    * Connection doesn't have a parent
+    */
    public HierarchicalState getParent()
    {
       return null;




More information about the jboss-cvs-commits mailing list