[jboss-cvs] JBoss Messaging SVN: r1763 - in branches/Branch_Client_Failover_Experiment: src/etc src/etc/server/default/deploy src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/container src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/etc tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 11 10:27:44 EST 2006


Author: timfox
Date: 2006-12-11 10:27:26 -0500 (Mon, 11 Dec 2006)
New Revision: 1763

Added:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java
Removed:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
   branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml
   branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
   branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA fixes. 



Modified: branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml	2006-12-11 15:27:26 UTC (rev 1763)
@@ -28,7 +28,7 @@
    </bind>   
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
       <advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
-   </bind>
+   </bind>  
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
       <advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.HAAspect"/>
    </bind>

Modified: branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml	2006-12-11 15:27:26 UTC (rev 1763)
@@ -37,13 +37,8 @@
    <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
       <interceptor-ref name="org.jboss.jms.server.container.ServerLogInterceptor"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->activate(..))">
-      <advice name="handleActivate" aspect="org.jboss.jms.server.container.SecurityAspect"/>
-   </bind>
-   <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->getMessageNow(..))">
-      <advice name="handleGetMessageNow" aspect="org.jboss.jms.server.container.SecurityAspect"/>
-   </bind>
 
+
    <!-- Browser -->
    <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.BrowserAdvised->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
       <interceptor-ref name="org.jboss.jms.server.container.ServerLogInterceptor"/>

Modified: branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-12-11 15:27:26 UTC (rev 1763)
@@ -87,7 +87,8 @@
             <AUTOCONF down_thread="false" up_thread="false"/>
             <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
             <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
-            <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>            
             <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
             <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
                          retransmit_timeout="100,200,600,1200,2400,4800"/>
@@ -107,7 +108,8 @@
             <AUTOCONF down_thread="false" up_thread="false"/>
             <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
             <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
-            <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>            
             <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
             <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
                          retransmit_timeout="100,200,600,1200,2400,4800"/>

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -42,9 +42,9 @@
 import org.jboss.aop.Advised;
 import org.jboss.jms.client.container.JmsClientAspectXMLLoader;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.referenceable.SerializableObjectRefAddr;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.jms.util.ThreadContextClassLoaderChanger;
 import org.jboss.logging.Logger;
 
@@ -199,9 +199,9 @@
          // The version used by the connection is the minimum of the server version for the
          // connection factory and the client code version
          
-         ConnectionDelegate cd = delegate.createConnectionDelegate(username, password);        
+         CreateConnectionResult res = delegate.createConnectionDelegate(username, password, -1);        
          
-         return new JBossConnection(cd, type);
+         return new JBossConnection(res.getDelegate(), type);
       }
       finally
       {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -48,7 +48,7 @@
 import org.jboss.jms.client.state.ProducerState;
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -103,7 +103,7 @@
          
          ClientConnectionDelegate connDelegate = createConnection(cfDelegate, username, password);
          
-         return connDelegate;   
+         return new CreateConnectionResult(connDelegate);
       }
       else
       {
@@ -163,8 +163,11 @@
       throws Exception
    {
       log.info("createConnection");
+      
+      CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
+      
       ClientConnectionDelegate connDelegate =
-         (ClientConnectionDelegate)cf.createConnectionDelegate(username, password);
+         (ClientConnectionDelegate)res.getDelegate();
       
       initialiseConnection(connDelegate);
       
@@ -194,26 +197,28 @@
       
       ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
       
-      log.info("Failing over connection");
+      log.info("calling createFailoverConnectionDelegate");
       
-      ConnectionStatus status =
-         newCF.createFailoverConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
+      CreateConnectionResult res =
+         newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
       
       log.info("returned from createFailoverConnectionDelegate");
                   
-      if (status.getDelegate() != null)
+      if (res.getDelegate() != null)
       {
          log.info("Got connection");
          
-         ClientConnectionDelegate newConnection = (ClientConnectionDelegate)state.getDelegate();
+         ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
          
+         log.info("newconnection is " + newConnection);
+         
          //We got the right server and created a new connection
          
          failover(failedConnection, newConnection);
       }
       else
       {
-         if (status.getActualFailoverNode() == -1)
+         if (res.getActualFailoverNode() == -1)
          {
             //No trace of failover was detected on the server side - this might happen if the client side
             //network fails temporarily so the client connection breaks but the server side network is still
@@ -272,32 +277,6 @@
          throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
       }
            
-      // Redirect connection routine.
-      // Verify the failureMap on the server and if out of sync find the correct delegate
-      
-      
-      //THIS IS WRONG - cannot use getFailoverNode method since failover node might change
-      //between getting the result and actually failing over
-      
-//      int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
-//      if (failoverNode!=delegateFound.getServerId())
-//      {
-//         delegateFound = null;
-//         for (int i = 0; i < delegates.length; i++)
-//         {
-//            if (delegates[i].getServerId() == failoverNode)
-//            {
-//               delegateFound = delegates[i];
-//            }
-//         }
-//
-//         if (delegateFound==null)
-//         {
-//            throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
-//         }
-//
-//      }
-
       return del;
    }
    
@@ -312,12 +291,14 @@
       int oldServerID = failedState.getServerID();
 
       ConnectionState newState = (ConnectionState)newConnection.getState();
+      
+      log.info("new state is: " + newState);
 
       failedState.copy(newState);
 
       // this is necessary so the connection will start "talking" to the new server instead
       failedState.setRemotingConnection(newState.getRemotingConnection());
-
+      
       if (failedState.getClientID() != null)
       {
          newConnection.setClientID(failedState.getClientID());
@@ -325,26 +306,30 @@
 
       // Transfering state from newDelegate to currentDelegate
       failedConnection.copyState(newConnection);
+      
+      log.info("failing over children");
 
-      if (failedState.isStarted())
-      {
-         failedConnection.start();
-      }
-
       for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
       {
          SessionState failedSessionState = (SessionState)i.next();
 
+         log.info("Creating session");
+         
          ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
             createSessionDelegate(failedSessionState.isTransacted(),
                                   failedSessionState.getAcknowledgeMode(),
                                   failedSessionState.isXA());
-
+         
+         log.info("Created session");
+                  
          ClientSessionDelegate failedSessionDelegate =
             (ClientSessionDelegate)failedSessionState.getDelegate();
 
          failedSessionDelegate.copyState(newSessionDelegate);
+         
+         log.info("copied state");
 
+
          if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
 
          //TODO Clebert please add comment as to why this clone is necessary
@@ -375,6 +360,13 @@
             }
          }
       }
+      
+      //We must not start the connection until the end
+      if (failedState.isStarted())
+      {
+         failedConnection.start();
+      }
+      
       log.info("Failover done");
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -28,7 +28,6 @@
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.aop.metadata.SimpleMetaData;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -39,6 +38,7 @@
 import org.jboss.jms.client.state.ProducerState;
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.BrowserDelegate;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.ProducerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
@@ -46,6 +46,7 @@
 import org.jboss.jms.message.MessageIdGenerator;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.server.Version;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.jms.server.remoting.MetaDataConstants;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.jms.tx.ResourceManagerFactory;
@@ -81,40 +82,49 @@
 
    public Object handleCreateConnectionDelegate(Invocation inv) throws Throwable
    {
-      ClientConnectionFactoryDelegate cfd = (ClientConnectionFactoryDelegate)inv.getTargetObject();
-      ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)inv.invokeNext();
-      connectionDelegate.init();
-
-      SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+      ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)inv.getTargetObject();
       
-      int serverID =
-         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+      CreateConnectionResult res = (CreateConnectionResult)inv.invokeNext();
       
-      Version connectionVersion = 
-         (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+      ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
       
-      JMSRemotingConnection connection = 
-         (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
-      
-      if (connectionVersion == null)
+      if (connectionDelegate != null)
       {
-         throw new IllegalStateException("Connection version is null");
+         
+         connectionDelegate.init();
+         
+         SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+         
+         int serverID =
+            ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+         
+         Version connectionVersion = 
+            (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+         
+         JMSRemotingConnection connection = 
+            (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
+         
+         if (connectionVersion == null)
+         {
+            throw new IllegalStateException("Connection version is null");
+         }
+         
+         // We have one resource manager per unique server
+         ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
+         
+         //We have one message id generator per unique server
+         MessageIdGenerator gen =
+            MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
+         
+         ConnectionState connectionState =
+            new ConnectionState(serverID, connectionDelegate,
+                     connection,
+                     connectionVersion, rm, gen);
+         
+         connectionDelegate.setState(connectionState);
       }
-           
-      //We have one resource manager per unique server
-      ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
       
-      //We have one message id generator per unique server
-      MessageIdGenerator gen =
-         MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
-
-      ConnectionState connectionState =
-         new ConnectionState(serverID, connectionDelegate,
-                             connection,
-                             connectionVersion, rm, gen);
-
-      connectionDelegate.setState(connectionState);
-      return connectionDelegate;
+      return res;
    }
    
    public Object handleCreateSessionDelegate(Invocation invocation) throws Throwable

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -23,7 +23,9 @@
 
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.jms.JMSException;
+
 import org.jboss.aop.Advised;
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.Invocation;
@@ -32,11 +34,10 @@
 import org.jboss.aop.util.PayloadKey;
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.Version;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.jms.server.remoting.JMSWireFormat;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.jms.server.remoting.MetaDataConstants;
@@ -117,28 +118,16 @@
    }
 
    // ConnectionFactoryDelegateImplementation -----------------------
-
+ 
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
-    * @see org.jboss.jms.client.container.StateCreationAspect#handleCreateConnectionDelegate(org.jboss.aop.joinpoint.Invocation)
-    * @see org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint#createConnectionDelegate(String, String) 
     */
-   public ConnectionDelegate createConnectionDelegate(String username, String password)
+   public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
       throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
-   
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
-      throws JMSException
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
 
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
@@ -268,19 +257,37 @@
       if (remotingConnection != null)
       {
          // It was a call to createConnectionDelegate - set the remoting connection to use
-         ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)ret;
+         
+         CreateConnectionResult res = (CreateConnectionResult)ret;
+         
+         ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
+         
+         if (connectionDelegate != null)
+         {
+            //We set the version for the connection and the remoting connection on the meta-data
+            //this is so the StateCreationAspect can pick it up
+   
+            SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+   
+            metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
+                                 remotingConnection, PayloadKey.TRANSIENT);
+   
+            metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
+                                 version, PayloadKey.TRANSIENT);
+         }
+         else
+         {
+            //Wrong server redirect on failure
+            //close the remoting connection
+            try
+            {
+               remotingConnection.stop();
+            }
+            catch (Throwable ignore)
+            {
+            }
+         }
 
-         //We set the version for the connection and the remoting connection on the meta-data
-         //this is so the StateCreationAspect can pick it up
-
-         SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-
-         metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
-                              remotingConnection, PayloadKey.TRANSIENT);
-
-         metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
-                              version, PayloadKey.TRANSIENT);
-
       }
 
       return ret;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -83,6 +83,7 @@
       // Enable client pinging. Server leasing is enabled separately on the server side
 
       Map config = new HashMap();
+      
       config.put(Client.ENABLE_LEASE, String.valueOf(clientPing));
 
       client = new Client(serverLocator, config);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,14 +22,15 @@
 package org.jboss.jms.server.connectionfactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
+
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
@@ -382,11 +383,9 @@
 
    private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
    {     
-      List nodes = new ArrayList(nodeAddressMap.keySet());
-      
       FailoverMapper mapper = replicator.getFailoverMapper();
       
-      failoverMap = mapper.generateMapping(nodes);
+      failoverMap = mapper.generateMapping(nodeAddressMap.keySet());
    }
    
    /**

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -24,6 +24,7 @@
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
 import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
 import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -66,51 +67,56 @@
           throw new IllegalStateException("Can't find handler");
        }
        
-       ClientConnectionDelegate del = (ClientConnectionDelegate)invocation.invokeNext();
+       CreateConnectionResult res = (CreateConnectionResult)invocation.invokeNext();
        
-       ConnectionAdvised advised = 
-          (ConnectionAdvised)JMSDispatcher.instance.getRegistered(new Integer(del.getID()));
+       ClientConnectionDelegate del = (ClientConnectionDelegate)res.getDelegate();
        
-       ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint)advised.getEndpoint();
-       
-       endpoint.setCallbackClient(handler.getCallbackClient());
-       
-       // Then we inject the remoting session id of the client
-       String sessionId =
-          (String)mi.getMetaData(MetaDataConstants.JMS,
-                                 MetaDataConstants.REMOTING_SESSION_ID);
-       
-       if (sessionId == null)
-       {
-          throw new IllegalStateException("Can't find session id");
+       if (del != null)
+       {          
+          ConnectionAdvised advised = 
+             (ConnectionAdvised)JMSDispatcher.instance.getRegistered(new Integer(del.getID()));
+          
+          ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint)advised.getEndpoint();
+          
+          endpoint.setCallbackClient(handler.getCallbackClient());
+          
+          // Then we inject the remoting session id of the client
+          String sessionId =
+             (String)mi.getMetaData(MetaDataConstants.JMS,
+                                    MetaDataConstants.REMOTING_SESSION_ID);
+          
+          if (sessionId == null)
+          {
+             throw new IllegalStateException("Can't find session id");
+          }
+          
+          // Then we inject the unique id of the client VM
+          String jmsClientVMID =
+             (String)mi.getMetaData(MetaDataConstants.JMS,
+                                    MetaDataConstants.JMS_CLIENT_VM_ID);
+          
+          if (jmsClientVMID == null)
+          {
+             throw new IllegalStateException("Can't find jms client id");
+          }
+          
+          endpoint.setRemotingInformation(jmsClientVMID, sessionId);       
+          
+          // Then we inject the version number from to be used
+          
+          Byte ver =
+             (Byte)mi.getMetaData(MetaDataConstants.JMS,
+                                  MetaDataConstants.VERSION_NUMBER);
+          
+          if (ver == null)
+          {
+             throw new IllegalStateException("Can't find version");
+          }
+          
+          endpoint.setUsingVersion(ver.byteValue());
        }
        
-       // Then we inject the unique id of the client VM
-       String jmsClientVMID =
-          (String)mi.getMetaData(MetaDataConstants.JMS,
-                                 MetaDataConstants.JMS_CLIENT_VM_ID);
-       
-       if (jmsClientVMID == null)
-       {
-          throw new IllegalStateException("Can't find jms client id");
-       }
-       
-       endpoint.setRemotingInformation(jmsClientVMID, sessionId);       
-       
-       // Then we inject the version number from to be used
-       
-       Byte ver =
-          (Byte)mi.getMetaData(MetaDataConstants.JMS,
-                               MetaDataConstants.VERSION_NUMBER);
-       
-       if (ver == null)
-       {
-          throw new IllegalStateException("Can't find version");
-       }
-       
-       endpoint.setUsingVersion(ver.byteValue());
-       
-       return del;
+       return res;
     }
     
     // Package protected ---------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -150,20 +150,6 @@
       return invocation.invokeNext();
    }   
    
-   public Object handleGetMessageNow(Invocation invocation) throws Throwable
-   {           
-      checkConsumerAccess(invocation);
-            
-      return invocation.invokeNext();
-   }   
-   
-   public Object handleActivate(Invocation invocation) throws Throwable
-   {           
-      checkConsumerAccess(invocation);
-      
-      return invocation.invokeNext();
-   } 
-   
    protected void checkConsumerAccess(Invocation invocation) throws Throwable
    {
       ConsumerAdvised del = (ConsumerAdvised)invocation.getTargetObject();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,7 +22,7 @@
 package org.jboss.jms.server.endpoint;
 
 import javax.jms.JMSException;
-import org.jboss.jms.delegate.ConnectionDelegate;
+
 import org.jboss.messaging.core.plugin.IdBlock;
 
 /**
@@ -36,15 +36,12 @@
  * $Id$
  */
 public interface ConnectionFactoryEndpoint
-{
-   ConnectionDelegate createConnectionDelegate(String username, String password)
+{ 
+   CreateConnectionResult createConnectionDelegate(String username,
+                                                   String password, 
+                                                   int failedNodeId)
       throws JMSException;
    
-   ConnectionStatus createFailoverConnectionDelegate(String username,
-                                                     String password, 
-                                                     int failedNodeId)
-      throws JMSException;
-   
    byte[] getClientAOPConfig() throws JMSException;
 
    IdBlock getIdBlock(int size) throws JMSException;

Deleted: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -1,65 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.endpoint;
-
-import java.io.Serializable;
-
-import org.jboss.jms.delegate.ConnectionDelegate;
-
-/**
- * A ConnectionStatus
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ConnectionStatus implements Serializable
-{
-   private static final long serialVersionUID = 4311863642735135167L;
-   
-   private ConnectionDelegate delegate;
-   
-   private int actualFailoverNodeId;
-   
-   public ConnectionStatus(ConnectionDelegate del)
-   {
-      this.delegate = del;
-   }
-   
-   public ConnectionStatus(int actualFailoverNode)
-   {
-      this.actualFailoverNodeId = actualFailoverNode;
-   }
-   
-   public ConnectionDelegate getDelegate()
-   {
-      return delegate;
-   }
-   
-   public int getActualFailoverNode()
-   {
-      return actualFailoverNodeId;
-   }
-
-}

Copied: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java (from rev 1760, branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java)
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java	2006-12-11 01:05:12 UTC (rev 1760)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import java.io.Serializable;
+
+import org.jboss.jms.delegate.ConnectionDelegate;
+
+/**
+ * 
+ * A CreateConnectionResult
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class CreateConnectionResult implements Serializable
+{
+   private static final long serialVersionUID = 4311863642735135167L;
+   
+   private ConnectionDelegate delegate;
+   
+   private int actualFailoverNodeId;
+   
+   public CreateConnectionResult(ConnectionDelegate del)
+   {
+      this.delegate = del;
+   }
+   
+   public CreateConnectionResult(int actualFailoverNode)
+   {
+      this.actualFailoverNodeId = actualFailoverNode;
+   }
+   
+   public ConnectionDelegate getDelegate()
+   {
+      return delegate;
+   }
+   
+   public int getActualFailoverNode()
+   {
+      return actualFailoverNodeId;
+   }
+
+}

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -173,6 +173,8 @@
       {
          log.debug("creating session " + (transacted ? "transacted" :"non transacted")+ ", " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " + (isXA ? "XA": "non XA"));
 
+         log.info("********** CREATING NEW SESSION");
+         
          if (closed)
          {
             throw new IllegalStateException("Connection is closed");

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -94,30 +94,33 @@
 
    // ConnectionFactoryDelegate implementation ----------------------
    
-   /**
-    * Called when a new connection is to be created for failover
-    * The ConnectionStatus instance returns either contains the delegate or the node id of the actual node
-    * Note we cannot query the failover node in a separate call since that might change between
-    * querying it and actually creating the delegate
-    */
-   public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
+   public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
       throws JMSException      
    {
       try
       {
-         //Wait for server side failover to complete
-         int failoverNodeId = serverPeer.waitForFailover(failedNodeId);
-         
-         if (failoverNodeId == -1 || failoverNodeId != serverPeer.getServerPeerID())
+         if (failedNodeId == -1)
          {
-            //We are on the wrong node - or no failover has occurred
-            return new ConnectionStatus(failoverNodeId);
+            //Just a standard createConnection
+            return new CreateConnectionResult(createConnectionDelegateInternal(username, password));            
          }
          else
          {
-            //We are on the right node, and failover has completed
-            //we can now create a connection delegate
-            return new ConnectionStatus(createConnectionDelegateInternal(username, password));
+            //Failover
+            //Wait for server side failover to complete
+            int failoverNodeId = serverPeer.waitForFailover(failedNodeId);
+            
+            if (failoverNodeId == -1 || failoverNodeId != serverPeer.getServerPeerID())
+            {
+               //We are on the wrong node - or no failover has occurred
+               return new CreateConnectionResult(failoverNodeId);
+            }
+            else
+            {
+               //We are on the right node, and failover has completed
+               //we can now create a connection delegate
+               return new CreateConnectionResult(createConnectionDelegateInternal(username, password));
+            }
          }
       }
       catch (Throwable t)
@@ -162,19 +165,6 @@
       return new ClientConnectionDelegate(connectionID, serverPeer.getServerPeerID());
    }
    
-   public ConnectionDelegate createConnectionDelegate(String username, String password)
-      throws JMSException
-   {
-      try
-      {
-         return createConnectionDelegateInternal(username, password);
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
-      }
-   }
-
    public byte[] getClientAOPConfig() throws JMSException
    {
       try

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -24,7 +24,7 @@
 import javax.jms.JMSException;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.server.endpoint.ConnectionFactoryEndpoint;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.messaging.core.plugin.IdBlock;
 
 /**
@@ -58,17 +58,11 @@
 
    // ConnectionFactoryEndpoint implementation -----------------------
 
-   public ConnectionDelegate createConnectionDelegate(String username, String password)
+   public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
       throws JMSException
    {
-      return endpoint.createConnectionDelegate(username, password);
+      return endpoint.createConnectionDelegate(username, password, failedNodeId);
    }
-   
-   public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
-      throws JMSException
-   {
-      return endpoint.createFailoverConnectionDelegate(username, password, failedNodeId);
-   }
 
    public byte[] getClientAOPConfig() throws JMSException
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -21,8 +21,8 @@
  */
 package org.jboss.messaging.core.plugin.contract;
 
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A FailoverMapper
@@ -37,5 +37,5 @@
  */
 public interface FailoverMapper
 {
-   Map generateMapping(List nodes);
+   Map generateMapping(Set nodes);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -627,7 +627,6 @@
            Map valuesOnNode = (Map)entry.getValue();
 
            out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
-           int bindingCount=0;
            for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
            {
                Map.Entry entry2 = (Map.Entry)valuesIterator.next();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -168,6 +168,8 @@
    private ReplicationListener nodeAddressMapListener;
 
    private boolean started;
+   
+   private QueuedExecutor viewExecutor;         
 
    /*
     * Constructor using Element for configuration
@@ -283,6 +285,8 @@
       failoverMap = new LinkedHashMap();
 
       leftSet = new HashSet();
+      
+      viewExecutor = new QueuedExecutor();
    }
 
    // MessagingComponent overrides
@@ -664,21 +668,6 @@
       return listBindingsForConditionInternal(condition, false);
    }
 
-   public int getFailoverNodeID(int nodeId)
-   {
-      synchronized (failoverMap)
-      {
-         Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
-
-         if (failoverNode == null)
-         {
-            return nodeId;
-         }
-
-         return failoverNode.intValue();
-      }
-   }
-
    public FailoverMapper getFailoverMapper()
    {
       return failoverMapper;
@@ -762,8 +751,11 @@
    public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
       throws Exception
    {
+      log.info("##########putReplicantLocally received, before lock");
+      
       synchronized (replicatedData)
       {
+         log.info("putReplicantLocally received, after lock");
          Map m = (Map)replicatedData.get(key);
 
          if (m == null)
@@ -777,6 +769,8 @@
 
          notifyListeners(key, m, true, originatorNodeID);
       }
+      
+      log.info("putReplicantLocally, completed");
    }
 
    /**
@@ -862,6 +856,8 @@
       {
          lock.writeLock().release();
       }
+      
+      log.info("****** binding added");
    }
 
    /*
@@ -1430,7 +1426,7 @@
 
       try
       {
-         log.info("Preparing failover against node " + failedNodeId);
+         log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
 
          /*
          We make sure a FailoverStatus object is put in the replicated data for the node
@@ -1456,9 +1452,11 @@
 
          status.startFailingOverForNode(failedNodeId);
 
+         log.info("Putting state that failover is starting");
+         
          put(FAILED_OVER_FOR_KEY, status);
 
-         log.info("Put into failed over map that starting failover");
+         log.info("Put state that failover is starting");
 
          //Get the map of queues for the failed node
 
@@ -1523,13 +1521,11 @@
             this.deleteBinding(failedNodeId, queueName);
 
             log.info("deleted binding for " + queueName);
+            
+            //Note we do not need to send an unbind request across the cluster - this is because
+            //when the node crashes a view change will hit the other nodes and that will cause
+            //all binding data for that node to be removed anyway
 
-            //Then an unbind request is sent - this cause other nodes to also remove it from the in memory
-            //condition and name maps
-            UnbindRequest unbindRequest = new UnbindRequest(failedNodeId, queueName);
-
-            syncSendRequest(unbindRequest);
-
             //If there is already a queue registered with the same name, then we set a flag "failed" on the
             //binding and then the queue will go into a special list of failed bindings
             //otherwise we treat at as a normal queue
@@ -1586,7 +1582,9 @@
          //TODO - should this be in a finally? I'm not sure
          status.finishFailingOver();
 
+         log.info("Putting state that failover has completed");
          put(FAILED_OVER_FOR_KEY, status);
+         log.info("Put state that failover has completed");
       }
       finally
       {
@@ -2105,15 +2103,6 @@
       }
    }
 
-   /**
-    * Is the current node the failover node for node <nodeId>?
-    * @param nodeId
-    */
-   private boolean isFailoverNodeForNode(int nodeId)
-   {
-      return this.currentNodeId == getFailoverNodeID(nodeId);
-   }
-
    private byte[] getStateAsBytes() throws Exception
    {
       List bindings = new ArrayList();
@@ -2322,26 +2311,55 @@
       //Cleanup any hanging transactions - we do this irrespective of whether we crashed
       check(theNodeId);
 
-      //Need to evaluate this before we regenerate the failover map
-      boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
-
-      log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-
-      log.info("Crashed: " + crashed);
-
-      //Remove any replicant data and non durable bindings for the node - again we need to do this
-      //irrespective of whether we crashed
-      //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
-      removeDataForNode(theNodeId);
-
-      if (crashed && isFailover)
-      {
-         //The node crashed and we are the failover node
-         //so let's perform failover
-
-         //TODO server side valve
-
-         failOver(theNodeId.intValue());
+      synchronized (failoverMap)
+      {         
+         //Need to evaluate this before we regenerate the failover map
+         Integer failoverNode = (Integer)failoverMap.get(theNodeId);                  
+    
+         if (failoverNode == null)
+         {
+            throw new IllegalStateException("Cannot find failover node for node " + theNodeId);
+         }
+         
+         //debug dump failover map
+         
+         Iterator iter = failoverMap.entrySet().iterator();
+         
+         log.info("Dumping failover map");
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
+            
+            Integer nodeId = (Integer)entry.getKey();
+            
+            Integer failoverNodeId = (Integer)entry.getValue();
+            
+            log.info("node->failover node: " + nodeId + " --> " + failoverNodeId);
+         }
+         log.info("end dump");
+         
+         //end debug
+         
+         boolean isFailover = failoverNode.intValue() == this.currentNodeId;
+            
+         log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
+   
+         log.info("Crashed: " + crashed);
+   
+         //Remove any replicant data and non durable bindings for the node - again we need to do this
+         //irrespective of whether we crashed
+         //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+         removeDataForNode(theNodeId);
+   
+         if (crashed && isFailover)
+         {
+            //The node crashed and we are the failover node
+            //so let's perform failover
+   
+            //TODO server side valve
+   
+            failOver(theNodeId.intValue());
+         }
       }
    }
 
@@ -2442,26 +2460,17 @@
 
       public void viewAccepted(View newView)
       {
-         //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
-         //TODO:     can't we do the same since this is pretty useful?
-         log.info(currentNodeId  + " got new view: " + newView + " postOffice:"
-                  + DefaultClusteredPostOffice.this.getOfficeName());
-
-         // JGroups will make sure this method is never called by more than one thread concurrently
-
-         View oldView = currentView;
-         currentView = newView;
-
          try
          {
-            verifyMembership(oldView, newView);
+            //We queue up changes and execute them asynchronously.
+            //This is because JGroups will not let us do stuff like send synch messages
+            //using the same thread that delivered the view change and this is what we need to
+            //do in failover, for example.
+            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
          }
-         catch (Throwable e)
+         catch (InterruptedException e)
          {
-            log.error("Caught Exception in MembershipListener", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
+            log.warn("Caught InterruptedException", e);
          }
       }
 
@@ -2471,9 +2480,47 @@
          return null;
       }
    }
+   
+   private void handleViewAccepted(View newView)
+   {
+      //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+      //TODO:     can't we do the same since this is pretty useful?
+      log.info(currentNodeId  + " got new view: " + newView + " postOffice:"
+               + DefaultClusteredPostOffice.this.getOfficeName());
 
+      // JGroups will make sure this method is never called by more than one thread concurrently
 
+      View oldView = currentView;
+      currentView = newView;
 
+      try
+      {
+         verifyMembership(oldView, newView);
+      }
+      catch (Throwable e)
+      {
+         log.error("Caught Exception in MembershipListener", e);
+         IllegalStateException e2 = new IllegalStateException(e.getMessage());
+         e2.setStackTrace(e.getStackTrace());
+         throw e2;
+      }
+   }
+
+   private class HandleViewAcceptedRunnable implements Runnable
+   {
+      private View newView;
+      
+      HandleViewAcceptedRunnable(View newView)
+      {
+         this.newView = newView;
+      }
+      
+      public void run()
+      {
+         handleViewAccepted(newView);
+      }
+   }
+
    /*
     * This class is used to listen for messages on the async channel
     */
@@ -2585,7 +2632,7 @@
 
       private void generateFailoverMap(Map nodeAddressMap)
       {
-         failoverMap = failoverMapper.generateMapping(new ArrayList(nodeAddressMap.keySet()));
+         failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
       }      
    }
 }
\ No newline at end of file

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -21,10 +21,10 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.contract.FailoverMapper;
@@ -45,15 +45,17 @@
 {
    private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
 
-   /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
-   public Map generateMapping(List nodes)
+   /*
+    * Generate a mapping given a set of nodes - nodes will be sorted by the method
+    * @see org.jboss.messaging.core.plugin.contract.FailoverMapper#generateMapping(java.util.Set)
+    */
+   public Map generateMapping(Set nodes)
    {
-      if (!(nodes instanceof ArrayList))
-      {
-         //Convert to array list for fast index access
-         nodes = new ArrayList(nodes);
-      }
+      Integer[] nodesArr = (Integer[])nodes.toArray(new Integer[nodes.size()]);
       
+      //First sort them so every node has a consistent view
+      Arrays.sort(nodesArr);
+      
       int s = nodes.size();
       
       log.info("Generating failover mapping, node size= "+ s);
@@ -70,7 +72,7 @@
             j = 0;
          }
          
-         failoverNodes.put(nodes.get(i), nodes.get(j));
+         failoverNodes.put(nodesArr[i], nodesArr[j]);
       }
       
       return failoverNodes;

Modified: branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml	2006-12-11 15:27:26 UTC (rev 1763)
@@ -37,9 +37,13 @@
    </category>
 
    <category name="org.jgroups">
-      <priority value="WARN"/>
+      <priority value="TRACE"/>
    </category>
 
+   <category name="org.jboss.remoting">
+         <priority value="TRACE"/>
+   </category>
+
    <category name="org.jboss">
       <priority value="INFO"/>
    </category>

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,9 +22,12 @@
 
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+
 import junit.framework.TestCase;
+
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
 
 /**
@@ -38,18 +41,18 @@
 
    public void testMapper()
    {
-      ArrayList list = new ArrayList();
+      Set set = new HashSet();
 
-      list.add(new Integer(50));
-      list.add(new Integer(25));
-      list.add(new Integer(15));
+      set.add(new Integer(50));
+      set.add(new Integer(25));
+      set.add(new Integer(15));
 
       DefaultFailoverMapper mapper = new DefaultFailoverMapper();
-      Map map = mapper.generateMapping(list);
+      Map map = mapper.generateMapping(set);
 
-      assertEquals(new Integer(15),map.get(new Integer(50)));
-      assertEquals(new Integer(50),map.get(new Integer(25)));
-      assertEquals(new Integer(25),map.get(new Integer(15)));
+      assertEquals(new Integer(15),map.get(new Integer(25)));
+      assertEquals(new Integer(50),map.get(new Integer(15)));
+      assertEquals(new Integer(25),map.get(new Integer(50)));
       
    }
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-11 15:27:26 UTC (rev 1763)
@@ -434,27 +434,44 @@
       
       ClientConnectionFactoryDelegate cf2 = delegates[1];
       
+      ClientConnectionFactoryDelegate cf3 = delegates[2];
+      
       int server0Id = cf1.getServerId();
       
       int server1Id = cf2.getServerId();
+      
+      int server2Id = cf3.getServerId();
+      
+      log.info("server 0 id: " + server0Id);
+      
+      log.info("server 1 id: " + server1Id);
+      
+      log.info("server 2 id: " + server2Id);
                   
       Map failoverMap = delegate.getFailoverMap();
       
-      int server0FailoverId = ((Integer)failoverMap.get(new Integer(server0Id))).intValue();
+      log.info(failoverMap.get(new Integer(server0Id)));
+      log.info(failoverMap.get(new Integer(server1Id)));
+      log.info(failoverMap.get(new Integer(server2Id)));
       
-      // server 0 should failover onto server 1
+      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
       
-      assertEquals(server1Id, server0FailoverId);
+      // server 1 should failover onto server 2
       
+      assertEquals(server2Id, server1FailoverId);
+      
       Connection conn = null;
       
       try
       {
       
-         //Send a message into a queue on server 0
+         //Get a connection on server 1
+         conn = factory.createConnection(); //connection on server 0
          
-         conn = factory.createConnection();
+         conn.close();
          
+         conn = factory.createConnection(); //connection on server 1
+         
          JBossConnection jbc = (JBossConnection)conn;
          
          ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
@@ -463,13 +480,13 @@
          
          int initialServerID = state.getServerID();
          
-         assertEquals(0, initialServerID);
+         assertEquals(1, initialServerID);
                            
          Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageProducer prod = sess.createProducer(queue0);
+         MessageProducer prod = sess.createProducer(queue1);
          
-         MessageConsumer cons = sess.createConsumer(queue0);
+         MessageConsumer cons = sess.createConsumer(queue1);
          
          final int NUM_MESSAGES = 100;
          
@@ -480,17 +497,17 @@
             prod.send(tm);
          }
          
-         //So now, messages should be in queue0 on server 0
-         //So we now kill server 0
+         //So now, messages should be in queue1 on server 1
+         //So we now kill server 1
          //Which should cause transparent failover of connection conn onto server 1
          
-         log.info("************ KILLING (CRASHING) SERVER 0");
+         log.info("************ KILLING (CRASHING) SERVER 1");
          
-         ServerManagement.getServer(0).destroy();
+         ServerManagement.getServer(1).destroy();
          
          log.info("killed server, now waiting");
          
-         Thread.sleep(25000);
+         Thread.sleep(5000);
          
          log.info("done wait");
          
@@ -500,16 +517,12 @@
          
          log.info("final server id= " + finalServerID);
          
-         //server id should now be 1
+         //server id should now be 2
          
-         assertEquals(1, finalServerID);
+         assertEquals(2, finalServerID);
          
-         log.info("here 2");
-         
          conn.start();
          
-         log.info("here 3");
-         
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             TextMessage tm = (TextMessage)cons.receive(1000);
@@ -520,7 +533,7 @@
             
             assertEquals("message:" + i, tm.getText());
          }
-         log.info("here 4");
+         log.info("done");
       }
       finally
       {         
@@ -539,42 +552,42 @@
       
    }
    
-   public void testEvenSimplerFailover() throws Exception
-   {
-      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-                  
-      Connection conn = null;
-      
-      try
-      {
-         conn = factory.createConnection();
-                           
-         log.info("************ KILLING (CRASHING) SERVER 0");
-         
-         ServerManagement.getServer(0).destroy();
-         
-         log.info("killed server, now waiting");
-         
-         Thread.sleep(25000);
-         
-         log.info("done wait");                
-      }
-      finally
-      {         
-         if (conn != null)
-         {
-            try
-            {
-               conn.close();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      }
-      
-   }
+//   public void testEvenSimplerFailover() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//                  
+//      Connection conn = null;
+//      
+//      try
+//      {
+//         conn = factory.createConnection();
+//                           
+//         log.info("************ KILLING (CRASHING) SERVER 0");
+//         
+//         ServerManagement.getServer(0).destroy();
+//         
+//         log.info("killed server, now waiting");
+//         
+//         Thread.sleep(25000);
+//         
+//         log.info("done wait");                
+//      }
+//      finally
+//      {         
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//      }
+//      
+//   }
    
    
 // public void testConnectionFactoryConnect() throws Exception




More information about the jboss-cvs-commits mailing list