[jboss-cvs] JBoss Messaging SVN: r1783 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 13 04:46:44 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-13 04:46:39 -0500 (Wed, 13 Dec 2006)
New Revision: 1783

Added:
   trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
HAAspect delegate refactoring. Added a new failover test (that currently fails)

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -72,8 +72,7 @@
    private int inUseCount;
 
    // The identity of the delegate this interceptor is associated with
-   private Integer id;
-   private String delegateType;
+   private DelegateIdentity id;
 
    // Static --------------------------------------------------------
 
@@ -106,7 +105,7 @@
       }
       else
       {
-         sb.append(delegateType).append("[").append(id.intValue()).append("]");
+         sb.append(id.getType()).append("[").append(id.getID()).append("]");
       }
       return sb.toString();
    }
@@ -129,7 +128,7 @@
       // logging purposes. It makes sense, since it's an PER_INSTANCE interceptor
       if (id == null)
       {
-         getIdentity(invocation);
+         id = DelegateIdentity.getIdentity(invocation);
       }
 
       String methodName = ((MethodInvocation) invocation).getMethod().getName();
@@ -319,14 +318,6 @@
 
    // Private --------------------------------------------------------
 
-   private void getIdentity(Invocation i)
-   {
-      DelegateSupport ds = (DelegateSupport)i.getTargetObject();
-      id = new Integer(ds.getID());
-      delegateType = ds.getClass().getName();
-      delegateType = delegateType.substring(delegateType.lastIndexOf('.') + 1);
-   }
-
    // Inner Classes --------------------------------------------------
 
 }

Added: trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/DelegateIdentity.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -0,0 +1,67 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.jms.client.container;
+
+import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.jms.client.delegate.DelegateSupport;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DelegateIdentity
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static DelegateIdentity getIdentity(Invocation i)
+   {
+      DelegateSupport ds = (DelegateSupport)i.getTargetObject();
+
+      Integer id = new Integer(ds.getID());
+      String type = ds.getClass().getName();
+
+      type = type.substring(type.lastIndexOf('.') + 1);
+
+      return new DelegateIdentity(id, type);
+   }
+
+   // Attributes ----------------------------------------------------
+
+   private Integer id;
+   private String type;
+
+   // Constructors --------------------------------------------------
+
+   public DelegateIdentity(Integer id, String type)
+   {
+      this.id = id;
+      this.type = type;
+   }
+
+   // Public --------------------------------------------------------
+
+   public Integer getID()
+   {
+      return id;
+   }
+
+   public String getType()
+   {
+      return type;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -66,287 +66,288 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * 
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
- *
- * TODO: Make some log.info as log.trace after the code is final
- *
  */
 public class HAAspect
 {
+   // Constants -----------------------------------------------------
+
    private static final Logger log = Logger.getLogger(HAAspect.class);
-   
+
+   public static final int MAX_RECONNECT_HOP_COUNT = 10;
+
+   // Static --------------------------------------------------------
+
    private static boolean trace = log.isTraceEnabled();
-   
-   //Cache this here
+
+   // Attributes ----------------------------------------------------
+
    private ClientConnectionFactoryDelegate[] delegates;
-   
-   //Cache this here
+
    private Map failoverMap;
-   
+
    private int currentRobinIndex;
-   
+
+   // The identity of the delegate this interceptor is associated with
+   private DelegateIdentity id;
+
+   // Constructors --------------------------------------------------
+
+   public HAAspect()
+   {
+      id = null;
+   }
+
+   // Public --------------------------------------------------------
+
    public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
    {
-      if (getServers(invocation) != null)
+      // maintain the identity of the delegate that sends invocation through this aspect, for
+      // logging purposes. It makes sense, since it's an PER_INSTANCE aspect.
+      if (id == null)
       {
-         // TODO: this should be in loop while we get exceptions creating connections, always trying
-         //       the next Delegate when we get an exception.
-         log.info("Clustered createConnection");
-                  
-         //In a clustered configuration we create connections in a round-robin fashion
-         //from the available servers
-         
-         ClientConnectionFactoryDelegate cfDelegate = getDelegateRoundRobin();
-         
-         //Now create a connection delegate for this
-         
-         MethodInvocation mi = (MethodInvocation)invocation;
-         
-         String username = (String)mi.getArguments()[0];
-         
-         String password = (String)mi.getArguments()[1];
-         
-         ClientConnectionDelegate connDelegate = createConnection(cfDelegate, username, password);
-         
-         return new CreateConnectionResult(connDelegate);
+         id = DelegateIdentity.getIdentity(invocation);
       }
-      else
+
+      cacheLocalDelegates(invocation);
+
+      if (delegates == null)
       {
-         //Non clustered
-
-         log.info("Assumed non clustered");
-         
+         // not clustered, pass the invocation through
          return invocation.invokeNext();
       }
-   }   
-   
-   //TODO this is currently hardcoded as round-robin, this should be made pluggable
-   private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
+
+      // clustered
+
+      // TODO: this should be in loop while we get exceptions creating connections, always trying
+      //       the next Delegate when we get an exception.
+
+      // In a clustered configuration we create connections in a round-robin fashion, contacting
+      // successively all available servers.
+
+      ClientConnectionFactoryDelegate cfDelegate = getDelegateRoundRobin();
+
+      // Now create a connection delegate for this
+
+      MethodInvocation mi = (MethodInvocation)invocation;
+      String username = (String)mi.getArguments()[0];
+      String password = (String)mi.getArguments()[1];
+
+      ClientConnectionDelegate cd = createConnection(cfDelegate, username, password);
+
+      return new CreateConnectionResult(cd);
+   }
+
+   public String toString()
    {
-      ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
-      
-      if (currentRobinIndex >= delegates.length)
+      StringBuffer sb = new StringBuffer("HAAspect.");
+      if (id == null)
       {
-         currentRobinIndex = 0;
+         sb.append("UNINITIALIZED");
       }
-      
-      return currentDelegate;
+      else
+      {
+         sb.append(id.getType()).append("[").append(id.getID()).append("]");
+      }
+      return sb.toString();
    }
-   
-   private synchronized ClientConnectionFactoryDelegate[] getServers(Invocation invocation)
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private synchronized void cacheLocalDelegates(Invocation invocation)
    {
-      if (delegates == null)
+      if (delegates != null)
       {
-         log.info("Looking for delegates");
+         // the first set of delegates is already cached, return
+         return;
+      }
 
-         MethodInvocation methodInvoke = (MethodInvocation)invocation;
+      MethodInvocation mi = (MethodInvocation)invocation;
+      Object target = mi.getTargetObject();
 
-         Object target = methodInvoke.getTargetObject();
-         
-         if (target instanceof ClusteredClientConnectionFactoryDelegate)
-         {
-            delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
-         }
+      if (target instanceof ClusteredClientConnectionFactoryDelegate)
+      {
+         ClusteredClientConnectionFactoryDelegate cccfd =
+            (ClusteredClientConnectionFactoryDelegate)target;
 
+         delegates = cccfd.getDelegates();
+
          if (delegates != null)
          {
-            failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
+            failoverMap = cccfd.getFailoverMap();
 
             if (failoverMap == null)
             {
-               throw new IllegalStateException("Cannot find failoverMap!");
+               throw new IllegalStateException("HAAspect cannot find the failover map!");
             }
          }
       }
-      
-      return delegates;
    }
-   
-   private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cf, String username, String password)
-      throws Exception
+
+   //TODO this is currently hardcoded as round-robin, this should be made pluggable
+   private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
    {
-      log.info("createConnection");
-      
-      CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
-      
-      ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)res.getDelegate();
-      
-      addListener(connDelegate);
-      
-      return connDelegate;   
+      ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
+
+      if (currentRobinIndex >= delegates.length)
+      {
+         currentRobinIndex = 0;
+      }
+
+      return currentDelegate;
    }
-   
-   private void addListener(ClientConnectionDelegate connDelegate)
+
+   private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cfd,
+                                                     String username,
+                                                     String password)
+      throws Exception
    {
-      //Add a connection listener
-      
-      ConnectionState state = (ConnectionState)((DelegateSupport)connDelegate).getState();
-      
-      ConnectionListener listener = new Listener(connDelegate);
-      
+      CreateConnectionResult res =
+         (CreateConnectionResult)cfd.createConnectionDelegate(username, password, -1);
+
+      ClientConnectionDelegate cd = (ClientConnectionDelegate)res.getDelegate();
+
+      // Add a connection listener to detect failure
+
+      ConnectionListener listener = new ConnectionFailureListener(cd);
+      ConnectionState state = (ConnectionState)((DelegateSupport)cd).getState();
       state.getRemotingConnection().getInvokingClient().addConnectionListener(listener);
+
+      return cd;
    }
 
-   //The connection has failed
-   private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
+   /**
+    * @return a failover ClientConnectionFactoryDelegate or null if a suitable delegate cannot be
+    *         found.
+    */
+   private ClientConnectionFactoryDelegate getFailoverDelegate(int failedServerID)
    {
-      log.info("Handling failure");
-      
-      //Get the connection factory we are going to failover onto
-      ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
-  
-      ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
-      
-      log.info("calling createFailoverConnectionDelegate");
-      
-      int tries = 0;
-      
-      //We try a maximum of 10 hops
-      final int MAX_TRIES = 10;
-      
-      while (tries < MAX_TRIES)
-      {         
-         //Create a connection using that connection factory
-         CreateConnectionResult res =
-            newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
-         
-         log.info("returned from createFailoverConnectionDelegate");
-                     
-         if (res.getDelegate() != null)
+      // Look up the server to failover onto in the failover map
+
+      Integer failoverServerID = (Integer)failoverMap.get(new Integer(failedServerID));
+
+      for (int i = 0; i < delegates.length; i++)
+      {
+         if (delegates[i].getServerId() == failoverServerID.intValue())
          {
-            log.info("Got connection");
-            
-            //We got the right server and created a new connection ok
-            
-            ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
-            
-            log.info("newconnection is " + newConnection);
-            
-            failover(failedConnection, newConnection);
-            
-            break;
+            return delegates[i];
          }
-         else
-         {
-            if (res.getActualFailoverNode() == -1)
-            {
-               //No trace of failover was detected on the server side - this might happen if the client side
-               //network fails temporarily so the client connection breaks but the server side network is still
-               //up and running - in this case we don't failover 
-               
-               //TODO Is this the right thing to do?
-               
-               log.trace("Client attempted to failover, but no failover had occurred on the server side");
-               
-               break;
-                           
-            }
-            else
-            {
-               //Server side failover has occurred / is occurring but we tried the wrong node
-               //Now we must try the correct node
-               
-               newCF = null;
-               
-               tries++;
-               
-               for (int i = 0; i < delegates.length; i++)
-               {
-                  ClientConnectionFactoryDelegate del = delegates[i];
-                  
-                  if (del.getServerId() == res.getActualFailoverNode())
-                  {
-                     newCF = del;
-                     
-                     break;
-                  }
-               }
-               
-               if (newCF == null)
-               {
-                  //Houston, we have a problem
-                  
-                  //TODO Could this ever happen? Should we send back the cf, or update it instead of just the id??
-                  throw new JMSException("Cannot find server with id " + res.getActualFailoverNode());
-               }               
-            }
-         }
       }
-      
-      if (tries == MAX_TRIES)
-      {
-         throw new JMSException("Cannot find correct server to failover onto");
-      }
+
+      return null;
    }
-   
-   private ClientConnectionFactoryDelegate getFailoverDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
+
+   private void handleFailure(ClientConnectionDelegate failedConnDelegate) throws Exception
    {
-      //We need to choose which delegate to fail over to
-      
-      ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
-      
-      int currentServerID = currentState.getServerID();
-      
-      //Lookup in the failover map to see which server to fail over onto
-      
-      Integer failoverServerID = (Integer)failoverMap.get(new Integer(currentServerID));
-      
-      if (failoverServerID == null)
+      ConnectionState failedConnState =
+         (ConnectionState)((DelegateSupport)failedConnDelegate).getState();
+
+      int failedServerID = failedConnState.getServerID();
+
+      log.debug(this + " handling failure on connection to node " + failedServerID);
+
+      // Get the default connection factory delegate we are going to failover onto
+
+      ClientConnectionFactoryDelegate failoverDelegate = getFailoverDelegate(failedServerID);
+
+      if (failoverDelegate == null)
       {
-         throw new IllegalStateException("Cannot find failover node for node " + currentServerID);
+         throw new IllegalStateException("Cannot find default failover node for server " +
+                                         failedServerID);
       }
-      
-      //Now find the actual delegate
-      
-      ClientConnectionFactoryDelegate del = null;
-      
-      for (int i = 0; i < delegates.length; i++)
+
+      // We attempt to connect to the failover node in a loop, since we might need to go through
+      // multiple hops
+
+      int attemptCount = 0;
+
+      outer: while (attemptCount < MAX_RECONNECT_HOP_COUNT)
       {
-         if (delegates[i].getServerId() == failoverServerID.intValue())
+         // Create a connection using that connection factory
+         CreateConnectionResult r = failoverDelegate.
+            createConnectionDelegate(failedConnState.getUser(),
+                                     failedConnState.getPassword(),
+                                     failedConnState.getServerID());
+
+         if (r.getDelegate() != null)
          {
-            del = delegates[i];
-            
-            break;
+            // We got the right server and created a new connection
+            performClientSideFailover(failedConnDelegate, (ClientConnectionDelegate)r.getDelegate());
+            return;
          }
+
+         // Did not get a valid connection to the node we've just tried
+
+         int actualServerID = r.getActualFailoverNode();
+
+         if (actualServerID == -1)
+         {
+            // No failover attempt was detected on the server side; this might happen if the client
+            // side network fails temporarily so the client connection breaks but the server cluster
+            // is still up and running - in this case we don't perform failover.
+
+            //TODO Is this the right thing to do?
+
+            log.warn("Client attempted failover, but no failover attempt " +
+                     "has been detected on the server side.");
+
+            return;
+         }
+
+         // Server side failover has occurred / is occurring but trying to go to the 'default'
+         // failover node did not succeed. Retry with the node suggested by the cluster.
+
+         attemptCount++;
+
+         for (int i = 0; i < delegates.length; i++)
+         {
+            if (delegates[i].getServerId() == actualServerID)
+            {
+               failoverDelegate = delegates[i];
+               continue outer;
+            }
+         }
+
+         // the delegate corresponding to the actualServerID not found among the cached delegates
+         //TODO Could this ever happen? Should we send back the cf, or update it instead of just the id??
+         throw new JMSException("Cannot find a cached connection factory delegate for " +
+                                "node " + actualServerID);
       }
-      
-      if (del == null)
-      {
-         throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
-      }
-           
-      return del;
+
+      throw new JMSException("Maximum number of failover attempts exceeded. " +
+                             "Cannot find a server to failover onto.");
    }
-   
-   private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception
+
+   private void performClientSideFailover(ClientConnectionDelegate failedConnDelegate,
+                                          ClientConnectionDelegate newConnDelegate) throws Exception
    {
-      if (trace) { log.trace("calling handleFailover"); }
-      
-      log.info("performing failover");
+      log.debug(this + " performing client side failover");
 
-      ConnectionState failedState = (ConnectionState)failedConnection.getState();
+      ConnectionState failedState = (ConnectionState)failedConnDelegate.getState();
+      ConnectionState newState = (ConnectionState)newConnDelegate.getState();
 
-      ConnectionState newState = (ConnectionState)newConnection.getState();
-      
       if (failedState.getClientID() != null)
       {
-         newConnection.setClientID(failedState.getClientID());
+         newConnDelegate.setClientID(failedState.getClientID());
       }
 
       // Transfer attributes from newDelegate to failedDelegate
-      failedConnection.copyAttributes(newConnection);
-      
+      failedConnDelegate.copyAttributes(newConnDelegate);
+
       int oldServerId = failedState.getServerID();
-      
+
       CallbackManager oldCallbackManager = failedState.getRemotingConnection().getCallbackManager();
-      
+
       //We need to update some of the attributes on the state
       failedState.copyState(newState);
-      
-      log.info("failing over children");
 
       for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
       {
@@ -354,26 +355,26 @@
 
          ClientSessionDelegate failedSessionDelegate =
             (ClientSessionDelegate)failedSessionState.getDelegate();
-                  
-         ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
+
+         ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnDelegate.
             createSessionDelegate(failedSessionState.isTransacted(),
                                   failedSessionState.getAcknowledgeMode(),
                                   failedSessionState.isXA());
-         
+
          SessionState newSessionState = (SessionState)newSessionDelegate.getState();
 
          failedSessionDelegate.copyAttributes(newSessionDelegate);
-         
+
          //We need to update some of the attributes on the state
-         newSessionState.copyState(newSessionState);                                  
-         
+         newSessionState.copyState(newSessionState);
+
          if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
-         
+
          List children = new ArrayList();
-         
+
          // TODO Why is this clone necessary?
          children.addAll(failedSessionState.getChildren());
-         
+
          Set consumerIds = new HashSet();
 
          for (Iterator j = children.iterator(); j.hasNext(); )
@@ -385,84 +386,84 @@
                handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
             }
             else if (sessionChild instanceof ConsumerState)
-            {               
-               handleFailoverOnConsumer(failedConnection,
+            {
+               handleFailoverOnConsumer(failedConnDelegate,
                                         failedState,
                                         failedSessionState,
                                         (ConsumerState)sessionChild,
                                         failedSessionDelegate,
                                         oldServerId,
-                                        oldCallbackManager);       
-               
+                                        oldCallbackManager);
+
                // We add the new consumer id to the list of old ids
-               consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));               
+               consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
             }
             else if (sessionChild instanceof BrowserState)
             {
                 handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
             }
          }
-                           
+
          /* Now we must sent the list of unacked AckInfos to the server - so the consumers
           * delivery lists can be repopulated
           */
          List ackInfos = null;
-         
+
          if (!failedSessionState.isTransacted() && !failedSessionState.isXA())
          {
             /*
             Now we remove any unacked np messages - this is because we don't want to ack them
             since the server won't know about them and will barf
             */
-            
+
             Iterator iter = newSessionState.getToAck().iterator();
-            
+
             while (iter.hasNext())
             {
                AckInfo info = (AckInfo)iter.next();
-               
+
                if (!info.getMessage().getMessage().isReliable())
                {
                   iter.remove();
-               }            
+               }
             }
-            
+
             //Get the ack infos from the list in the session state
             ackInfos = failedSessionState.getToAck();
          }
          else
-         {            
+         {
             //Transacted session - we need to get the acks from the resource manager
             //btw we have kept the old resource manager
             ResourceManager rm = failedState.getResourceManager();
-            
+
             // Remove any non persistent acks - so server doesn't barf on commit
-            
+
             rm.removeNonPersistentAcks(consumerIds);
-            
-            ackInfos = rm.getAckInfosForConsumerIds(consumerIds);            
+
+            ackInfos = rm.getAckInfosForConsumerIds(consumerIds);
          }
-         
+
          if (!ackInfos.isEmpty())
          {
             log.info("Sending " + ackInfos.size() + " unacked");
             newSessionDelegate.sendUnackedAckInfos(ackInfos);
-         }                 
+         }
       }
-            
+
       //TODO
       //If the session had consumers which are now closed then there is no way to recreate them on the server
       //we need to store with session id
-      
-      //We must not start the connection until the end
+
+      // We must not start the connection until the end
       if (failedState.isStarted())
       {
-         failedConnection.start();
+         failedConnDelegate.start();
       }
-      
-      log.info("Failover done");
+
+      log.info(this + " completed client-side failover");
    }
-   
+
    private void handleFailoverOnConsumer(ClientConnectionDelegate failedConnectionDelegate,
                                          ConnectionState failedConnectionState,
                                          SessionState failedSessionState,
@@ -472,12 +473,12 @@
                                          CallbackManager oldCallbackManager)
       throws JMSException
    {
-      log.info("Failing over consumer");
-      
+      log.debug(this + " failing over consumer " + failedConsumerState);
+
       ClientConsumerDelegate failedConsumerDelegate =
          (ClientConsumerDelegate)failedConsumerState.getDelegate();
 
-      if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
+      if (trace) { log.trace(this + " creating alternate consumer"); }
 
       ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
          createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
@@ -487,60 +488,59 @@
                                  failedConsumerState.isConnectionConsumer(),
                                  failedConsumerState.getChannelId());
 
-      if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
-            
-      //Copy the attributes from the new consumer to the old consumer
+      if (trace) { log.trace(this + " alternate consumer created"); }
+
+      // Copy the attributes from the new consumer to the old consumer
       failedConsumerDelegate.copyAttributes(newConsumerDelegate);
 
       ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
-      
+
       int oldConsumerID = failedConsumerState.getConsumerID();
-      
-      //Update attributes on the old state
+
+      // Update attributes on the old state
       failedConsumerState.copyState(newState);
 
       if (failedSessionState.isTransacted() || failedSessionState.isXA())
       {
-         //Replace the old consumer id with the new consumer id
-         
+         // Replace the old consumer id with the new consumer id
+
          ResourceManager rm = failedConnectionState.getResourceManager();
-         
+
          rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
       }
-            
-      //We need to re-use the existing message callback handler
-            
-      log.info("Old server id:" + oldServerID + " old consumer id:" + oldConsumerID);
-      MessageCallbackHandler oldHandler = oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
-      
+
+      // We need to re-use the existing message callback handler
+
+      MessageCallbackHandler oldHandler =
+         oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
+
       ConnectionState newConnectionState = (ConnectionState)failedConnectionDelegate.getState();
-      
-      CallbackManager newCallbackManager = newConnectionState.getRemotingConnection().getCallbackManager();
-      
-      log.info("New server id:" + newConnectionState.getServerID() + " new consuer id:" + newState.getConsumerID());
-      
-      //Remove the new handler
-      MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(newConnectionState.getServerID(),
-                                                                               newState.getConsumerID());
-      
-      log.info("New handler is " + System.identityHashCode(newHandler));
-      
+
+      CallbackManager newCallbackManager =
+         newConnectionState.getRemotingConnection().getCallbackManager();
+
+      // Remove the new handler
+      MessageCallbackHandler newHandler = newCallbackManager.
+         unregisterHandler(newConnectionState.getServerID(), newState.getConsumerID());
+
+      log.debug("New handler is " + System.identityHashCode(newHandler));
+
       //But we need to update some fields from the new one
       oldHandler.copyState(newHandler);
-      
+
       //Now we re-register the old handler with the new callback manager
-            
+
       newCallbackManager.registerHandler(newConnectionState.getServerID(),
                                          newState.getConsumerID(),
                                          oldHandler);
-      
-      //We don't need to add the handler to the session state since it is already there - we
-      //are re-using the old handler
-      
-      log.info("failed over consumer");
+
+      // We don't need to add the handler to the session state since it is already there - we
+      // are re-using the old handler
+
+      log.debug(this + " failed over consumer");
    }
-   
 
+
    private void handleFailoverOnProducer(ProducerState failedProducerState,
                                          ClientSessionDelegate failedSessionDelegate)
       throws JMSException
@@ -552,9 +552,8 @@
          (ClientProducerDelegate)failedProducerState.getDelegate();
 
       failedProducerDelegate.copyAttributes(newProducerDelegate);
-      
       failedProducerState.copyState((ProducerState)newProducerDelegate.getState());
-      
+
       if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
    }
 
@@ -563,56 +562,59 @@
       throws JMSException
    {
       ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)failedSessionDelegate.
-          createBrowserDelegate(failedBrowserState.getJmsDestination(),failedBrowserState.getMessageSelector());
+         createBrowserDelegate(failedBrowserState.getJmsDestination(),
+                               failedBrowserState.getMessageSelector());
 
       ClientBrowserDelegate failedBrowserDelegate =
          (ClientBrowserDelegate)failedBrowserState.getDelegate();
 
       failedBrowserDelegate.copyAttributes(newBrowserDelegate);
-      
       failedBrowserState.copyState((BrowserState)newBrowserDelegate.getState());
-      
+
       if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
 
    }
-   
-   private class Listener implements ConnectionListener
+
+   // Inner classes -------------------------------------------------
+
+   private class ConnectionFailureListener implements ConnectionListener
    {
-      private ClientConnectionDelegate connection;
+      private ClientConnectionDelegate cd;
       private Valve valve;
-      
-      Listener(ClientConnectionDelegate connection)
+
+      ConnectionFailureListener(ClientConnectionDelegate cd)
       {
-         this.connection = connection;
+         this.cd = cd;
          this.valve = new Valve();
-         
-         log.info("************* CREATING LISTENER");
       }
-      
+
+      // ConnectionListener implementation ---------------------------
+
       public void handleConnectionException(Throwable throwable, Client client)
       {
          try
          {
-            log.info("********* EXCEPTION DETECTED", throwable);
+            log.debug(this + " is being notified of connection failure: " + throwable);
 
-            // it references Valve to a local variable.
-            // Since we reset the valve at the end, we need to guarantee we will have the same Valve instance
-            // from the moment we enter this method.
+            // It references Valve to a local variable. Since we reset the valve at the end, we need
+            // to guarantee we will have the same Valve instance from the moment we entered this
+            // method.
+
             Valve localValve = null;
             synchronized (this)
             {
                localValve = valve;
             }
 
-            // We can't have more than one exception being caught at the same time.
-            // On that case we will open the valve and any other thread opening the valve
-            // will wait until its completion
+            // We can't have more than one exception being caught at the same time. On that case we
+            // will open the valve and any other thread opening the valve will wait until
+            // its completion.
+
             if (localValve.open())
             {
                try
                {
-                  log.info("********* HANDLING FAILOVER");
-                  handleFailure(connection);
+                  handleFailure(cd);
                }
                finally
                {
@@ -625,15 +627,19 @@
             }
             else
             {
-               log.info("********* ANOTHER THREAD WAS RESPONSIBLE FOR FAILOVER AS THE VALVE WAS CLOSED");
+               log.debug(this + ": Another thread was responsible for failover as the valve was closed");
             }
          }
          catch (Throwable e)
          {
             log.error("Caught exception in handling failure", e);
-            e.printStackTrace();
          }
       }
+
+      public String toString()
+      {
+         return "ConnectionFailureListener[" + Integer.toHexString(hashCode()) + "]";
+      }
    }
 }
 

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -294,7 +294,7 @@
 
    public String toString()
    {
-      return "ClientConnectionFactoryDelegate[ID=" + id + "]";
+      return "ClientConnectionFactoryDelegate[" + id + "]";
    }
    
    public String getServerLocatorURI()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -118,7 +118,7 @@
 
    public String toString()
    {
-      StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[ID=");
+      StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[");
       sb.append(id).append("][");
       if (delegates == null)
       {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2006-12-13 07:05:09 UTC (rev 1782)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2006-12-13 09:46:39 UTC (rev 1783)
@@ -82,9 +82,9 @@
          log.info("######## KILLED NODE 1");
          log.info("########");
 
-         // TODO - this shouldn't be necessary if we have the client valve in place         
+         // TODO - this shouldn't be necessary if we have the client valve in place
          log.info("Sleeping for 1 min");
-         Thread.sleep(60000);
+         Thread.sleep(30000);
 
          // we must receive the message
 




More information about the jboss-cvs-commits mailing list