[jboss-cvs] JBoss Messaging SVN: r1486 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client: container remoting state

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 17 12:41:07 EDT 2006


Author: clebert.suconic at jboss.com
Date: 2006-10-17 12:41:04 -0400 (Tue, 17 Oct 2006)
New Revision: 1486

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - MessageConsumers

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-10-17 16:41:04 UTC (rev 1486)
@@ -27,6 +27,8 @@
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.JBossConnectionMetaData;
+import org.jboss.jms.client.remoting.CallbackManager;
+import org.jboss.jms.client.remoting.MessageCallbackHandler;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
@@ -40,6 +42,7 @@
 import org.jboss.remoting.ConnectionListener;
 
 import java.util.Iterator;
+import java.util.ArrayList;
 
 /**
  * Handles operations related to the connection
@@ -60,6 +63,7 @@
    // Static --------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ConnectionAspect.class);
+   private static boolean trace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
 
@@ -212,24 +216,31 @@
         }
 
         ClientConnectionDelegate currentDelegate = ((ClientConnectionDelegate)invocation.getTargetObject());
-        ConnectionState currentState = (ConnectionState)currentDelegate.getState();
+        ConnectionState currentConnectionState = (ConnectionState)currentDelegate.getState();
 
+        int oldServerId = currentConnectionState.getServerID();
 
         ClientConnectionDelegate otherConnection = (ClientConnectionDelegate)((MethodInvocation)invocation).getArguments()[0];
-        ConnectionState otherConnectionState = (ConnectionState)((ClientConnectionDelegate)otherConnection).getState();
+        ConnectionState newConnectionState = (ConnectionState)((ClientConnectionDelegate)otherConnection).getState();
 
-        currentState.failOver(otherConnectionState);
+        currentConnectionState.failOver(newConnectionState);
 
-        if (currentState.getClientID()!=null)
+        if (currentConnectionState.getClientID()!=null)
         {
-            otherConnection.setClientID(currentState.getClientID());
+            otherConnection.setClientID(currentConnectionState.getClientID());
         }
 
         // Transfering state from newDelegate to currentDelegate
         currentDelegate.transferHAState(otherConnection);
 
-        Iterator sessionsIterator = currentState.getChildren().iterator();
+        if (currentConnectionState.isStarted())
+        {
+            currentDelegate.start();
+        }
 
+
+        Iterator sessionsIterator = currentConnectionState.getChildren().iterator();
+
         while(sessionsIterator.hasNext())
         {
             SessionState sessionState = (SessionState)sessionsIterator.next();
@@ -246,7 +257,10 @@
                 log.trace("Replacing session (" + currentSessionDelegate + ") by a new session created on the new failed over connection (" + newSessionDelegate + ")");
             }
 
-            Iterator sessionObjectsIterator = sessionState.getChildren().iterator();
+
+            ArrayList children = new ArrayList();
+            children.addAll(sessionState.getChildren());
+            Iterator sessionObjectsIterator = children.iterator();
             while (sessionObjectsIterator.hasNext())
             {
                 HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)sessionObjectsIterator.next();
@@ -257,7 +271,7 @@
                 }
                 else if (sessionChild instanceof ConsumerState)
                 {
-                    handleFailoverOnConsumer((ConsumerState)sessionChild,currentSessionDelegate);
+                    handleFailoverOnConsumer(currentConnectionState,sessionState,(ConsumerState)sessionChild,currentSessionDelegate,oldServerId);
                 }
             }
         }
@@ -267,16 +281,34 @@
         return null;
     }
 
-    private void handleFailoverOnConsumer(ConsumerState consumerState, ClientSessionDelegate sessionDelegate) throws JMSException
+    private void handleFailoverOnConsumer(ConnectionState connectionState,SessionState sessionState, ConsumerState consumerState, ClientSessionDelegate sessionDelegate, int oldServerId) throws JMSException
     {
+        if (trace)
+        {
+            log.trace("handleFailoverOnConsumer: creating alternate consumer");
+        }
         ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)sessionDelegate.createConsumerDelegate((JBossDestination) consumerState.getDestination(),consumerState.getSelector(),consumerState.isNoLocal(),consumerState.getSubscriptionName(),false);
+        if (trace)
+        {
+            log.trace("handleFailoverOnConsumer: alternate consumer created");
+        }
 
         ClientConsumerDelegate currentConsumerDelegate = (ClientConsumerDelegate)consumerState.getDelegate();
         currentConsumerDelegate.transferHAState(newConsumerDelegate);
 
+        int oldConsumerId = consumerState.getConsumerID();
+
         ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
-        consumerState.setConsumerID(newState.getConsumerID());
+        consumerState.failOver(newState);
 
+        CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
+        MessageCallbackHandler handler = cm.unregisterHandler(oldServerId,oldConsumerId);
+        handler.setConsumerId(consumerState.getConsumerID());
+
+        cm.registerHandler(connectionState.getServerID(),consumerState.getConsumerID(),handler);
+        sessionState.addCallbackHandler(handler);
+
     }
 
     private void handleFailoverOnProducer(ProducerState producerState, ClientSessionDelegate sessionDelegate) throws JMSException

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-10-17 16:41:04 UTC (rev 1486)
@@ -63,11 +63,11 @@
       callbackHandlers.put(lookup, handler);
    }
    
-   public void unregisterHandler(int serverId, int consumerId)
+   public MessageCallbackHandler unregisterHandler(int serverId, int consumerId)
    {
       Long lookup = calcLookup(serverId, consumerId);
       
-      callbackHandlers.remove(lookup);
+      return (MessageCallbackHandler)callbackHandlers.remove(lookup);
    }
    
    private Long calcLookup(int serverId, int consumerId)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-10-17 16:41:04 UTC (rev 1486)
@@ -506,6 +506,11 @@
    {
       return consumerID;
    }
+
+   public void setConsumerId(int consumerId)
+   {
+       this.consumerID=consumerId;
+   }
    
    public void addToFrontOfBuffer(MessageProxy proxy)
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-10-17 14:49:19 UTC (rev 1485)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-10-17 16:41:04 UTC (rev 1486)
@@ -107,11 +107,6 @@
       return consumerID;
    }
 
-   public void setConsumerID(int consumerID)
-   {
-       this.consumerID=consumerID;
-   }
-   
    public boolean isConnectionConsumer()
    {
       return isConnectionConsumer;
@@ -151,6 +146,11 @@
     public void setSubscriptionName(String subscriptionName) {
         this.subscriptionName = subscriptionName;
     }
+
+    public void failOver(ConsumerState newState)
+    {
+        this.consumerID=newState.consumerID;
+    }
 }
 
 




More information about the jboss-cvs-commits mailing list