[jboss-cvs] JBoss Messaging SVN: r1644 - in branches/Branch_Client_Failover_Experiment: src/etc src/main/org/jboss/jms/client/container tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 27 14:31:41 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-27 14:31:37 -0500 (Mon, 27 Nov 2006)
New Revision: 1644

Added:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/FailoverAspect.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Refactoring handleFailover to a separate aspect

Modified: branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml	2006-11-27 12:44:12 UTC (rev 1643)
+++ branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml	2006-11-27 19:31:37 UTC (rev 1644)
@@ -18,6 +18,7 @@
    <aspect class="org.jboss.jms.client.container.AsfAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.BrowserAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.ConnectionAspect" scope="PER_INSTANCE"/>
+   <aspect class="org.jboss.jms.client.container.FailoverAspect" scope="PER_VM"/>
    <aspect class="org.jboss.jms.client.container.FactoryAspect" scope="PER_INSTANCE"/>
 
    <!-- ConnectionFactory Stack-->        
@@ -42,7 +43,7 @@
        <advice name="handleStop" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
     </bind>
     <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->failOver(..))">
-       <advice name="handleFailover" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
+       <advice name="handleFailover" aspect="org.jboss.jms.client.container.FailoverAspect"/>
     </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createConnectionConsumer(..))">
       <advice name="handleCreateConnectionConsumer" aspect="org.jboss.jms.client.container.AsfAspect"/>

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-11-27 12:44:12 UTC (rev 1643)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-11-27 19:31:37 UTC (rev 1644)
@@ -21,27 +21,20 @@
   */
 package org.jboss.jms.client.container;
 
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.JBossConnectionMetaData;
-import org.jboss.jms.client.delegate.*;
-import org.jboss.jms.client.remoting.CallbackManager;
-import org.jboss.jms.client.remoting.MessageCallbackHandler;
-import org.jboss.jms.client.state.*;
-import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.tx.ResourceManagerFactory;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
 
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * Handles operations related to the connection
  *
@@ -206,81 +199,6 @@
    }
 
 
-   public Object handleFailover(Invocation invocation) throws Throwable
-   {
-      if (trace) { log.trace("calling handleFailover"); }
-
-      ClientConnectionDelegate failedDelegate =
-         ((ClientConnectionDelegate)invocation.getTargetObject());
-      ConnectionState failedState = (ConnectionState)failedDelegate.getState();
-
-      int oldServerID = failedState.getServerID();
-
-      ClientConnectionDelegate newDelegate =
-         (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
-      ConnectionState newState = (ConnectionState)newDelegate.getState();
-
-      failedState.copy(newState);
-
-      if (failedState.getClientID() != null)
-      {
-         newDelegate.setClientID(failedState.getClientID());
-      }
-
-      // Transfering state from newDelegate to currentDelegate
-      failedDelegate.copyState(newDelegate);
-
-      if (failedState.isStarted())
-      {
-         failedDelegate.start();
-      }
-
-      for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
-      {
-         SessionState failedSessionState = (SessionState)i.next();
-
-         ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
-            createSessionDelegate(failedSessionState.isTransacted(),
-                                  failedSessionState.getAcknowledgeMode(),
-                                  failedSessionState.isXA());
-
-         ClientSessionDelegate failedSessionDelegate =
-            (ClientSessionDelegate)failedSessionState.getDelegate();
-
-         failedSessionDelegate.copyState(newSessionDelegate);
-
-         if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
-
-         List children = new ArrayList();
-         children.addAll(failedSessionState.getChildren());
-
-         for(Iterator j = children.iterator(); j.hasNext(); )
-         {
-            HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)j.next();
-
-            if (sessionChild instanceof ProducerState)
-            {
-               handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
-            }
-            else if (sessionChild instanceof ConsumerState)
-            {
-               handleFailoverOnConsumer(failedDelegate,
-                                        failedState,
-                                        failedSessionState,
-                                        (ConsumerState)sessionChild,
-                                        failedSessionDelegate,
-                                        oldServerID);
-            }
-            else if (sessionChild instanceof BrowserState)
-            {
-                handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
-            }
-         }
-      }
-
-      return null;
-   }
-
    // ConnectionListener implementation -----------------------------------------------------------
 
    public void handleConnectionException(Throwable t, Client c)
@@ -334,84 +252,6 @@
       return state;
    }
 
-   private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
-                                         ConnectionState failedConnectionState,
-                                         SessionState failedSessionState,
-                                         ConsumerState failedConsumerState,
-                                         ClientSessionDelegate failedSessionDelegate,
-                                         int oldServerID)
-      throws JMSException
-   {
-      ClientConsumerDelegate failedConsumerDelegate =
-         (ClientConsumerDelegate)failedConsumerState.getDelegate();
 
-      if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
-
-      ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
-         failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
-                                failedConsumerState.getSelector(),
-                                failedConsumerState.isNoLocal(),                                
-                                failedConsumerState.getSubscriptionName(),
-                                failedConsumerState.isConnectionConsumer(),
-                                failedConsumerDelegate.getChannelId(),
-                                oldServerID);
-
-      if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
-
-      failedConsumerDelegate.copyState(newConsumerDelegate);
-
-      int oldConsumerID = failedConsumerState.getConsumerID();
-
-      ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
-      failedConsumerState.copy(newState);
-
-      failedConnectionState.getResourceManager().
-         handleFailover(failedSessionState.getCurrentTxId(),
-                        oldConsumerID,
-                        failedConsumerState.getConsumerID());
-
-      CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
-
-      MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
-      handler.setConsumerId(failedConsumerState.getConsumerID());
-
-      cm.registerHandler(failedConnectionState.getServerID(),
-                         failedConsumerState.getConsumerID(),
-                         handler);
-      failedSessionState.addCallbackHandler(handler);
-
-   }
-
-   private void handleFailoverOnProducer(ProducerState failedProducerState,
-                                         ClientSessionDelegate failedSessionDelegate)
-      throws JMSException
-   {
-      ClientProducerDelegate newProducerDelegate = (ClientProducerDelegate)failedSessionDelegate.
-         createProducerDelegate((JBossDestination)failedProducerState.getDestination());
-
-      ClientProducerDelegate failedProducerDelegate =
-         (ClientProducerDelegate)failedProducerState.getDelegate();
-
-      failedProducerDelegate.copyState(newProducerDelegate);
-
-      if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
-   }
-
-   private void handleFailoverOnBrowser(BrowserState failedBrowserState,
-                                         ClientSessionDelegate failedSessionDelegate)
-      throws JMSException
-   {
-      ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)failedSessionDelegate.
-          createBrowserDelegate(failedBrowserState.getJmsDestination(),failedBrowserState.getMessageSelector());
-
-      ClientBrowserDelegate failedBrowserDelegate =
-         (ClientBrowserDelegate)failedBrowserState.getDelegate();
-
-      failedBrowserDelegate.copyState(newBrowserDelegate);
-
-      if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
-
-   }
-
    // Inner classes -------------------------------------------------
 }

Added: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/FailoverAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/FailoverAspect.java	2006-11-27 12:44:12 UTC (rev 1643)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/FailoverAspect.java	2006-11-27 19:31:37 UTC (rev 1644)
@@ -0,0 +1,216 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.container;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.jms.JMSException;
+import org.jboss.aop.joinpoint.Invocation;
+import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientBrowserDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.delegate.ClientProducerDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.remoting.CallbackManager;
+import org.jboss.jms.client.remoting.MessageCallbackHandler;
+import org.jboss.jms.client.state.BrowserState;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.HierarchicalStateSupport;
+import org.jboss.jms.client.state.ProducerState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.logging.Logger;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class FailoverAspect
+{
+
+   private static final Logger log = Logger.getLogger(ConnectionAspect.class);
+   private static boolean trace = log.isTraceEnabled();
+
+   
+
+   public Object handleFailover(Invocation invocation) throws Throwable
+   {
+      if (trace) { log.trace("calling handleFailover"); }
+
+      ClientConnectionDelegate failedDelegate =
+         ((ClientConnectionDelegate)invocation.getTargetObject());
+      ConnectionState failedState = (ConnectionState)failedDelegate.getState();
+
+      int oldServerID = failedState.getServerID();
+
+      ClientConnectionDelegate newDelegate =
+         (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
+      ConnectionState newState = (ConnectionState)newDelegate.getState();
+
+      failedState.copy(newState);
+
+      if (failedState.getClientID() != null)
+      {
+         newDelegate.setClientID(failedState.getClientID());
+      }
+
+      // Transfering state from newDelegate to currentDelegate
+      failedDelegate.copyState(newDelegate);
+
+      if (failedState.isStarted())
+      {
+         failedDelegate.start();
+      }
+
+      for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
+      {
+         SessionState failedSessionState = (SessionState)i.next();
+
+         ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
+            createSessionDelegate(failedSessionState.isTransacted(),
+                                  failedSessionState.getAcknowledgeMode(),
+                                  failedSessionState.isXA());
+
+         ClientSessionDelegate failedSessionDelegate =
+            (ClientSessionDelegate)failedSessionState.getDelegate();
+
+         failedSessionDelegate.copyState(newSessionDelegate);
+
+         if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
+
+         List children = new ArrayList();
+         children.addAll(failedSessionState.getChildren());
+
+         for(Iterator j = children.iterator(); j.hasNext(); )
+         {
+            HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)j.next();
+
+            if (sessionChild instanceof ProducerState)
+            {
+               handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
+            }
+            else if (sessionChild instanceof ConsumerState)
+            {
+               handleFailoverOnConsumer(failedDelegate,
+                                        failedState,
+                                        failedSessionState,
+                                        (ConsumerState)sessionChild,
+                                        failedSessionDelegate,
+                                        oldServerID);
+            }
+            else if (sessionChild instanceof BrowserState)
+            {
+                handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
+            }
+         }
+      }
+
+      return null;
+   }
+
+   private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
+                                         ConnectionState failedConnectionState,
+                                         SessionState failedSessionState,
+                                         ConsumerState failedConsumerState,
+                                         ClientSessionDelegate failedSessionDelegate,
+                                         int oldServerID)
+      throws JMSException
+   {
+      ClientConsumerDelegate failedConsumerDelegate =
+         (ClientConsumerDelegate)failedConsumerState.getDelegate();
+
+      if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
+
+      ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
+         failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
+                                failedConsumerState.getSelector(),
+                                failedConsumerState.isNoLocal(),
+                                failedConsumerState.getSubscriptionName(),
+                                failedConsumerState.isConnectionConsumer(),
+                                failedConsumerDelegate.getChannelId(),
+                                oldServerID);
+
+      if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
+
+      failedConsumerDelegate.copyState(newConsumerDelegate);
+
+      int oldConsumerID = failedConsumerState.getConsumerID();
+
+      ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
+      failedConsumerState.copy(newState);
+
+      failedConnectionState.getResourceManager().
+         handleFailover(failedSessionState.getCurrentTxId(),
+                        oldConsumerID,
+                        failedConsumerState.getConsumerID());
+
+      CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
+
+      MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
+      handler.setConsumerId(failedConsumerState.getConsumerID());
+
+      cm.registerHandler(failedConnectionState.getServerID(),
+                         failedConsumerState.getConsumerID(),
+                         handler);
+      failedSessionState.addCallbackHandler(handler);
+
+   }
+
+   private void handleFailoverOnProducer(ProducerState failedProducerState,
+                                         ClientSessionDelegate failedSessionDelegate)
+      throws JMSException
+   {
+      ClientProducerDelegate newProducerDelegate = (ClientProducerDelegate)failedSessionDelegate.
+         createProducerDelegate((JBossDestination)failedProducerState.getDestination());
+
+      ClientProducerDelegate failedProducerDelegate =
+         (ClientProducerDelegate)failedProducerState.getDelegate();
+
+      failedProducerDelegate.copyState(newProducerDelegate);
+
+      if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
+   }
+
+   private void handleFailoverOnBrowser(BrowserState failedBrowserState,
+                                         ClientSessionDelegate failedSessionDelegate)
+      throws JMSException
+   {
+      ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)failedSessionDelegate.
+          createBrowserDelegate(failedBrowserState.getJmsDestination(),failedBrowserState.getMessageSelector());
+
+      ClientBrowserDelegate failedBrowserDelegate =
+         (ClientBrowserDelegate)failedBrowserState.getDelegate();
+
+      failedBrowserDelegate.copyState(newBrowserDelegate);
+
+      if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
+
+   }
+
+
+}

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-11-27 12:44:12 UTC (rev 1643)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-11-27 19:31:37 UTC (rev 1644)
@@ -1,6 +1,12 @@
 package org.jboss.test.messaging.jms.clustering;
 
-import javax.jms.*;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossMessageConsumer;
 import org.jboss.jms.client.JBossSession;
@@ -47,7 +53,7 @@
          assertEquals(3,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
          ServerManagement.stop(0,true);
          System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(1).getNumberOfNodesOnCluster());
-         assertEquals(2,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         assertEquals(2,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
       }
       finally
       {




More information about the jboss-cvs-commits mailing list