[jboss-cvs] JBoss Messaging SVN: r1714 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 6 22:17:54 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-06 22:17:47 -0500 (Wed, 06 Dec 2006)
New Revision: 1714

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixing clustering CF & adding server redirect logic into HAAspect

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -25,9 +25,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-
 import javax.jms.JMSException;
-
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.aop.metadata.SimpleMetaData;
@@ -37,6 +35,7 @@
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.client.remoting.MessageCallbackHandler;
@@ -47,7 +46,6 @@
 import org.jboss.jms.client.state.ProducerState;
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.remoting.MetaDataConstants;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -103,6 +101,8 @@
       else
       {
          //Non clustered
+
+         log.info("Assumed non clustered");
          
          return invocation.invokeNext();
       }
@@ -123,15 +123,31 @@
    private synchronized ClientConnectionFactoryDelegate[] getServers(Invocation invocation)
    {
       if (delegates == null)
-      {         
+      {
+         log.info("Looking for delegates");
          SimpleMetaData metaData = invocation.getMetaData();
-         
-         delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
-         
+
+         //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
+
+         MethodInvocation methodInvoke = (MethodInvocation)invocation;
+         //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
+
+
+         // this is a hack, but I couldn't get this working through metaData
+         // TODO: FIX THIS! metaData should contain CF_DELEGATES
+         Object target = methodInvoke.getTargetObject();
+         if (target instanceof ClusteredClientConnectionFactoryDelegate)
+         {
+            delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
+         }
+
+
          if (delegates != null)
-         {          
-            failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
-            
+         {
+            //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
+            //failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
+            failoverIndexes = (int[])((ClusteredClientConnectionFactoryDelegate)target).getFailoverIndexes();
+
             if (failoverIndexes == null)
             {
                throw new IllegalStateException("Cannot find failoverIndexes!");
@@ -145,6 +161,7 @@
    private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cf, String username, String password)
       throws Exception
    {
+      log.info("createConnection");
       ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)cf.createConnectionDelegate(username, password);
       
       //Add a connection listener
@@ -161,11 +178,11 @@
    private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
    {
       ClientConnectionFactoryDelegate newCF = getAlternateDelegate(failedConnection);
-      
+
       //TODO implement client side valve to prevent invocations occurring whilst failover is occurring
       
       ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
-      
+
       ClientConnectionDelegate newConnection = createConnection(newCF, state.getUser(), state.getPassword());
       
       try
@@ -181,7 +198,7 @@
       }
    }
    
-   private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate)
+   private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
    {
       //We need to choose which delegate to fail over to
       
@@ -189,13 +206,6 @@
       //The fail over delegate for delegates[i] is given by
       //delegates[failoverIndexes[i]]
       
-      //The technique (TODO) is to try that server. There is a possibility the expected failover server
-      //is not actually the real failover server, e.g. since the client side array and server side state
-      //are not totally in synch.
-      //In this case we need to implement hopping, so the incorrect server redirects us to the correct server.
-      //This may require several hops to get the right server (since another failure(s) can occur when the hop is
-      //happening!)
-      
       ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
       
       String currentLocator =
@@ -229,7 +239,33 @@
          throw new IllegalStateException("Cannot failover connection since no servers to fail over onto");
       }      
 
-      return delegates[failoverIndexes[local]];
+      ClientConnectionFactoryDelegate delegateFound = delegates[failoverIndexes[local]];
+
+
+      // Redirect connection routine.
+      // Verify the failureMap on the server and if out of sync find the correct delegate
+      
+      int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
+      if (failoverNode!=delegateFound.getServerId())
+      {
+         delegateFound = null;
+         for (int i = 0; i < delegates.length; i++)
+         {
+            if (delegates[i].getServerId() == failoverNode)
+            {
+               delegateFound = delegates[i];
+            }
+         }
+
+         if (delegateFound==null)
+         {
+            throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
+         }
+
+      }
+
+      return delegateFound;
+
    }
    
    private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception
@@ -244,6 +280,9 @@
 
       failedState.copy(newState);
 
+      // this is necessary so the connection will start "talking" to the new server instead
+      failedState.setRemotingConnection(newState.getRemotingConnection());
+
       if (failedState.getClientID() != null)
       {
          newConnection.setClientID(failedState.getClientID());

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -23,9 +23,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.jms.JMSException;
-
 import org.jboss.aop.Advised;
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.Invocation;
@@ -70,6 +68,9 @@
    //This data is needed in order to create a connection
    protected String serverLocatorURI;
    protected Version serverVersion;
+
+   // This property is used on redirect on failover logic (verify if a new delegate could be used during a failover)
+   protected int serverId;
    protected boolean clientPing;
    
    private transient boolean trace;
@@ -101,12 +102,13 @@
 
    // Constructors --------------------------------------------------
 
-   public ClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+   public ClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
                                           Version serverVersion,
                                           boolean clientPing)
    {
       super(objectID);
 
+      this.serverId = serverId;
       this.serverLocatorURI = serverLocatorURI;
       this.serverVersion = serverVersion;
       this.clientPing = clientPing;
@@ -147,6 +149,16 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
+   public int getFailoverNode(int node) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+   public boolean stillAround(int node) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Public --------------------------------------------------------
 
    public synchronized Object invoke(Invocation invocation) throws Throwable
@@ -157,7 +169,7 @@
       if (trace) { log.trace("invoking " + methodName + " on server"); }
 
       SimpleMetaData md = mi.getMetaData();
-      
+
       md.addMetaData(Dispatcher.DISPATCHER,
                      Dispatcher.OID,
                      new Integer(id),
@@ -174,19 +186,19 @@
        * difficulties in knowing when to close it.
        */
 
-      Client client; 
-      
+      Client client;
+
       JMSRemotingConnection remotingConnection = null;
 
       if ("createConnectionDelegate".equals(methodName))
       {
          // Create a new connection
-         
+
          remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing);
          remotingConnection.start();
-         
+
          client = remotingConnection.getInvokingClient();
-         
+
          md.addMetaData(MetaDataConstants.JMS,
                         MetaDataConstants.REMOTING_SESSION_ID,
                         client.getSessionId(),
@@ -200,42 +212,42 @@
       else
       {
          //getClientAOPConfig or getIDBlock
-         
+
          // Create a client - make sure pinging is off
-         
+
          Map configuration = new HashMap();
 
          configuration.put(Client.ENABLE_LEASE, String.valueOf(false));
-         
-         client = new Client(new InvokerLocator(serverLocatorURI), configuration);     
-         
+
+         client = new Client(new InvokerLocator(serverLocatorURI), configuration);
+
          client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
-         
-         client.connect();         
-         
+
+         client.connect();
+
          client.setMarshaller(new JMSWireFormat());
-         client.setUnMarshaller(new JMSWireFormat());         
+         client.setUnMarshaller(new JMSWireFormat());
       }
 
       //What version should we use for invocations on this connection factory?
       Version version = getVersionToUse(serverVersion);
       byte v = version.getProviderIncrementingVersion();
-      
+
       MessagingMarshallable request = new MessagingMarshallable(v, mi);
-      
+
       MessagingMarshallable response;
 
       try
       {
          response = (MessagingMarshallable)client.invoke(request, null);
-         
+
          if (trace) { log.trace("got server response for " + methodName); }
       }
       catch (Throwable t)
       {
          //If we were invoking createConnectionDelegate and failure occurs then we need to clear
          //up the JMSRemotingConnection
-         
+
          if (remotingConnection != null)
          {
             try
@@ -246,25 +258,25 @@
             {
             }
          }
-         
+
          throw t;
       }
 
       Object ret = response.getLoad();
-      
+
       if (remotingConnection != null)
       {
          // It was a call to createConnectionDelegate - set the remoting connection to use
          ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)ret;
-         
+
          //We set the version for the connection and the remoting connection on the meta-data
          //this is so the StateCreationAspect can pick it up
-         
+
          SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-         
+
          metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
                               remotingConnection, PayloadKey.TRANSIENT);
-         
+
          metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
                               version, PayloadKey.TRANSIENT);
 
@@ -284,6 +296,11 @@
       return serverLocatorURI;
    }
 
+   public int getServerId()
+   {
+      return serverId;
+   }
+
    // Protected -----------------------------------------------------
 
    protected Client getClient()

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -59,10 +59,10 @@
 
    // Constructors --------------------------------------------------
 
-   public ClusteredClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+   public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
                                                    Version serverVersion, boolean clientPing)
    {
-      super(objectID, serverLocatorURI, serverVersion, clientPing);
+      super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
    }
 
    // DelegateSupport overrides -------------------------------------
@@ -105,6 +105,12 @@
       return delegates;
    }
 
+   /** As metadata is not working, I'm exposing this temporarily */
+   public int[] getFailoverIndexes()
+   {
+      return failoverIndexes;
+   }
+
    public String toString()
    {
       return "ClusteredClientConnectionFactoryDelegate[" + id + "] with delegates.length = " + (delegates==null?"null":Integer.toString(delegates.length));

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -21,16 +21,15 @@
   */
 package org.jboss.jms.server;
 
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.net.URL;
 import java.util.Map;
 import java.util.Set;
-
 import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-
 import org.jboss.aop.AspectXmlLoader;
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
@@ -58,8 +57,6 @@
 import org.jboss.system.ServiceMBeanSupport;
 import org.w3c.dom.Element;
 
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
 /**
  * A JMS server peer.
  *
@@ -591,6 +588,16 @@
    {
       return jmsUserManager;
    }
+
+   public Replicator getReplicator() throws Exception
+   {
+      PostOffice postOffice = getQueuePostOfficeInstance();
+      if (!(postOffice instanceof Replicator))
+      {
+         throw new  IllegalAccessException("This operations is only legal on clustering configurations");
+      }
+      return (Replicator)postOffice;
+   }
    
    public PostOffice getQueuePostOfficeInstance() throws Exception
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -127,14 +127,15 @@
          {                                             
             //Replicator might still be null since we might be deploying a clustered cf in a non clustered
             //post office (which is ok)
-            delegate = new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+            delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
          }
       }
       
       if (delegate == null)
       {
          //Local
-         delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+         delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+                                        locatorURI, version, clientPing);
       }
 
       log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
 package org.jboss.jms.server.endpoint;
 
 import javax.jms.JMSException;
-
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.messaging.core.plugin.IdBlock;
 
@@ -44,5 +43,25 @@
    byte[] getClientAOPConfig() throws JMSException;
 
    IdBlock getIdBlock(int size) throws JMSException;
+
+   /** Return the node that should take over a given node */
+   int getFailoverNode(int node) throws JMSException;
+
+   /**
+    * This method will be used by HAAspect interceptor to ask other servers if
+    * a given server still around.
+    *
+    * This will be useful on retries from client2server communication. If a communication exception happened
+    * on the communication with a given server we will check with other peers to confirm if the server really
+    * diead before restabilishing a connection.
+    *
+    * This method is not very useful on leasing but it will be usefull on Exception listeners.
+    *
+    * Say, if a communication exception happens on an interceptor, the interceptor could ask other servers
+    * about their current membership and decide about a retry or a failover. 
+    *
+    * TODO: a better name for this method?
+    * */
+   boolean stillAround(int node) throws JMSException;
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
 package org.jboss.jms.server.endpoint;
 
 import javax.jms.JMSException;
-
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.server.ServerPeer;
@@ -137,7 +136,36 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
       }
    }
-   
+
+
+   public int getFailoverNode(int node) throws JMSException
+   {
+      try
+      {
+         ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+         return peer.getReplicator().getFailoverNodeID(node);
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " getFailoverNode");
+      }
+   }
+
+
+   // TODO: Better name
+   public boolean stillAround(int node) throws JMSException
+   {
+      try
+      {
+         ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+         return peer.getReplicator().stillAround(node);
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " stillAround");
+      }
+   }
+
    public byte[] getClientAOPConfig() throws JMSException
    {
       try

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
 package org.jboss.jms.server.endpoint.advised;
 
 import javax.jms.JMSException;
-
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.server.endpoint.ConnectionFactoryEndpoint;
 import org.jboss.messaging.core.plugin.IdBlock;
@@ -74,6 +73,17 @@
       return endpoint.getIdBlock(size);
    }
 
+   public int getFailoverNode(int node) throws JMSException
+   {
+      return endpoint.getFailoverNode(node);
+   }
+
+   // TODO: find a better name... look at javadoc on ConnectionFactoryInterface
+   public boolean stillAround(int node) throws JMSException
+   {
+      return endpoint.stillAround(node);
+   }
+
    // AdvisedSupport override ---------------------------------------
 
    public Object getEndpoint()

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -79,4 +79,18 @@
     *         returns the original nodeID. 
     */
    int getFailoverNodeID(int nodeID);
+
+
+   /**
+    *
+    * This method will be used by HAAspect interceptor to ask other servers if
+    * a given server still around.
+    *
+    * This will be useful on retries from client2server communication. If a communication exception happened
+    * on the communication with a given server we will check with other peers to confirm if the server really
+    * diead before restabilishing a connection
+    *
+    * TODO: a better name for this method?
+    * */
+   public boolean stillAround(int nodeId);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -608,6 +608,15 @@
       }
    }
 
+   // TODO: find a better name
+   public boolean stillAround(int nodeId)
+   {
+      synchronized (failoverMap)
+      {
+         return failoverMap.get(new Integer(nodeId))!=null;
+      }
+   }
+
    // Replicator implementation --------------------------------------------------------------------------
    
    public Map get(Serializable key) throws Exception

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,10 +22,22 @@
 
 package org.jboss.test.messaging.jms.clustering;
 
+import javax.jms.Destination;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -53,146 +65,145 @@
 
    // Public --------------------------------------------------------
 
-   public void testConnectionFactoryConnect() throws Exception
-   {
-      try
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         ClusteredClientConnectionFactoryDelegate delegate =
-                      (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         log.info ("number of delegates = " + delegate.getDelegates().length);
-         log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-
-         assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-         assertEquals(3, delegate.getDelegates().length);
-
-
-         ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
-         ServerManagement.stop(2,true);
-
-         ServerManagement.log(ServerManagement.INFO,"##### Looking up ConnectionFactory at testConnectionFactoryConnect");
-
-         factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
-         assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-         assertEquals(2, delegate.getDelegates().length);
-      }
-      finally
-      {
-         ServerManagement.start("all", 2);
-      }
-
-
-   }
-
-
-//   public void testTopicSubscriber() throws Exception
+//   public void testConnectionFactoryConnect() throws Exception
 //   {
 //      try
 //      {
-//         log.info("++testTopicSubscriber");
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//                      (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         log.info ("number of delegates = " + delegate.getDelegates().length);
+//         log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
 //
-//         log.info(">>Lookup Queue");
-//         Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+//         delegate.init();
 //
-//         log.info("Creating connection server1");
-//         JBossConnection conn = (JBossConnection) cf1.createConnection();
-//         conn.setClientID("testClient");
-//         conn.start();
+//         for (int i = 0; i < 3; i++)
+//         {
+//            int failNode = delegate.getDelegates()[0].getFailoverNode(i);
+//            log.info("Failover node for server" + i + " = " + failNode);
+//            log.info("InvokerLocator for server " + i + " = " + delegate.getDelegates()[i].getServerLocatorURI());
 //
-//         JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
-//         ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
-//         SessionState sessionState = (SessionState) clientSessionDelegate.getState();
-//
-//         MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
-//         JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
-//
-//         org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
-//         ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
-//
-//         log.info("subscriptionName=" + consumerState.getSubscriptionName());
-//
-//
-//         log.info(">>Creating Producer");
-//         MessageProducer producer = session.createProducer(destination);
-//         log.info(">>creating Message");
-//         Message message = session.createTextMessage("Hello Before");
-//         log.info(">>sending Message");
-//         producer.send(message);
-//         session.commit();
-//
-//         receiveMessage("consumerHA", consumerHA, true, false);
-//
-//         session.commit();
-//         //if (true) return;
-//
-//         Object txID = sessionState.getCurrentTxId();
-//
-//         producer.send(session.createTextMessage("Hello again before failover"));
-//
-//         ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
-//
-//         JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-//
-//         log.info(">>Creating alternate connection");
-//         JBossConnection conn2 = (JBossConnection) cf2.createConnection();
-//         log.info("NewConnectionCreated=" + conn2);
-//
-//         log.info(">>Failling over");
-//         assertSame(originalRemoting, delegate.getRemotingConnection());
-//         conn.getDelegate().failOver(conn2.getDelegate());
-//
-//
-//
-//         try
-//         {
-//            originalRemoting.stop();
-//         } catch (Throwable throwable)
-//         {
-//            throwable.printStackTrace();
+//            assertEquals("Server1 should have the same failoverMapping",
+//                         failNode, delegate.getDelegates()[1].getFailoverNode(i));
+//            assertEquals("Server2 should have the same failoverMapping",
+//                         failNode, delegate.getDelegates()[2].getFailoverNode(i));
 //         }
 //
-//         ServerManagement.stop(0, false);
+//         assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+//         assertEquals(3, delegate.getDelegates().length);
 //
-//         assertNotSame(originalRemoting, delegate.getRemotingConnection());
 //
-//         //System.out.println("Kill server1"); Thread.sleep(10000);
+//         ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
+//         ServerManagement.stop(2,true);
 //
-//         message = session.createTextMessage("Hello After");
-//         log.info(">>Sending new message");
-//         producer.send(message);
+//         ServerManagement.log(ServerManagement.INFO,"##### Looking up ConnectionFactory at testConnectionFactoryConnect");
 //
-//         assertEquals(txID, sessionState.getCurrentTxId());
-//         System.out.println("TransactionID on client = " + txID);
-//         log.info(">>Final commit");
+//         factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
 //
-//         /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
-//          connSecondServer.start();
-//          JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-//          MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
-//
-//         session.commit();
-//
-//         /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-//        receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-//        receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
-//
-//         log.info("Calling alternate receiver");
-//         receiveMessage("consumerHA", consumerHA, true, false);
-//         receiveMessage("consumerHA", consumerHA, true, false);
-//         receiveMessage("consumerHA", consumerHA, true, true);
-//
-//
-//         session.commit();
-//
+//         assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+//         assertEquals(2, delegate.getDelegates().length);
 //      }
 //      finally
 //      {
-//         // restart the server as it was probably stopped (tearDown will need that)
-//         ServerManagement.start("all", 0, true);
+//         ServerManagement.start("all", 2);
 //      }
+//
+//
 //   }
+
+
+   public void testTopicSubscriber() throws Exception
+   {
+      try
+      {
+         log.info("++testTopicSubscriber");
+
+
+         JBossConnectionFactory jbcf1 = (JBossConnectionFactory)cf1;
+         assertTrue(jbcf1.getDelegate() instanceof ClusteredClientConnectionFactoryDelegate);
+
+         log.info(">>Lookup Queue");
+         Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+
+         log.info("Creating connection server1");
+         JBossConnection conn = (JBossConnection) cf1.createConnection();
+         conn.setClientID("testClient");
+         conn.start();
+
+         JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+         SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+         MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+         JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+         org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+         ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+         log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+         log.info(">>Creating Producer");
+         MessageProducer producer = session.createProducer(destination);
+         log.info(">>creating Message");
+         Message message = session.createTextMessage("Hello Before");
+         log.info(">>sending Message");
+         producer.send(message);
+         session.commit();
+
+         receiveMessage("consumerHA", consumerHA, true, false);
+
+         session.commit();
+         //if (true) return;
+
+         Object txID = sessionState.getCurrentTxId();
+
+         producer.send(session.createTextMessage("Hello again before failover"));
+
+         ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+         ServerManagement.stop(0, false);
+
+         Thread.sleep(25000);
+
+         //System.out.println("Kill server1"); Thread.sleep(10000);
+
+         message = session.createTextMessage("Hello After");
+         log.info(">>Sending new message");
+         producer.send(message);
+
+         assertEquals(txID, sessionState.getCurrentTxId());
+         System.out.println("TransactionID on client = " + txID);
+         log.info(">>Final commit");
+
+         /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+          connSecondServer.start();
+          JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+          MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+         session.commit();
+
+         /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+        receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+        receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+         log.info("Calling alternate receiver");
+         receiveMessage("consumerHA", consumerHA, true, false);
+         receiveMessage("consumerHA", consumerHA, true, false);
+         receiveMessage("consumerHA", consumerHA, true, true);
+
+
+         session.commit();
+
+      }
+      finally
+      {
+         // restart the server as it was probably stopped (tearDown will need that)
+         ServerManagement.start("all", 0);
+      }
+   }
+
 //
 //   public void testQueueHA() throws Exception
 //   {




More information about the jboss-cvs-commits mailing list