[jboss-cvs] JBoss Messaging SVN: r1734 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 9 09:40:44 EST 2006


Author: timfox
Date: 2006-12-09 09:40:30 -0500 (Sat, 09 Dec 2006)
New Revision: 1734

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixed HA issue with replicating chnages



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.jms.JMSException;
 
@@ -75,9 +76,9 @@
    //Cache this here
    private ClientConnectionFactoryDelegate[] delegates;
    
-   private int[] failoverIndexes;
+   private Map failoverMap;
    
-   private int current;
+   private int currentRobinIndex;
    
    public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
    {
@@ -115,11 +116,11 @@
    //TODO this is currently hardcoded as round-robin, this should be made pluggable
    private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
    {
-      ClientConnectionFactoryDelegate currentDelegate = delegates[current++];
+      ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
       
-      if (current >= delegates.length)
+      if (currentRobinIndex >= delegates.length)
       {
-         current = 0;
+         currentRobinIndex = 0;
       }
       return currentDelegate;
    }
@@ -150,11 +151,11 @@
          {
             //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
             //failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
-            failoverIndexes = (int[])((ClusteredClientConnectionFactoryDelegate)target).getFailoverIndexes();
+            failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
 
-            if (failoverIndexes == null)
+            if (failoverMap == null)
             {
-               throw new IllegalStateException("Cannot find failoverIndexes!");
+               throw new IllegalStateException("Cannot find failoverMap!");
             }
          }
       }
@@ -181,7 +182,7 @@
    //The connection has failed
    private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
    {
-      ClientConnectionFactoryDelegate newCF = getAlternateDelegate(failedConnection);
+      ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
 
       //TODO implement client side valve to prevent invocations occurring whilst failover is occurring
       
@@ -202,74 +203,69 @@
       }
    }
    
-   private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
+   private ClientConnectionFactoryDelegate getFailoverDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
    {
       //We need to choose which delegate to fail over to
       
-      //This is determined by the failoverIndexes array
-      //The fail over delegate for delegates[i] is given by
-      //delegates[failoverIndexes[i]]
-      
       ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
       
-      String currentLocator =
-         currentState.getRemotingConnection().getInvokingClient().getInvoker().getLocator().getLocatorURI();
+      int currentServerID = currentState.getServerID();
       
-      int local = -1;
+      //Lookup in the failover map to see which server to fail over onto
       
+      Integer failoverServerID = (Integer)failoverMap.get(new Integer(currentServerID));
+      
+      if (failoverServerID == null)
+      {
+         throw new IllegalStateException("Cannot find failover node for node " + currentServerID);
+      }
+      
+      //Now find the actual delegate
+      
+      ClientConnectionFactoryDelegate del = null;
+      
       for (int i = 0; i < delegates.length; i++)
       {
-         ConnectionState state = (ConnectionState)((DelegateSupport)delegates[i]).getState();
-         
-         String locator =
-            state.getRemotingConnection().getInvokingClient().getInvoker().getLocator().getLocatorURI();
-         
-         if (currentLocator.equals(locator))
+         if (delegates[i].getServerId() == failoverServerID.intValue())
          {
-            local = i;
+            del = delegates[i];
             
             break;
          }
       }
       
-      //Sanity
-      if (local == -1)
+      if (del == null)
       {
-         throw new IllegalStateException("Cannot find local delegate!");
+         throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
       }
-      
-      if (delegates.length == 1)
-      {
-         throw new IllegalStateException("Cannot failover connection since no servers to fail over onto");
-      }      
-
-      ClientConnectionFactoryDelegate delegateFound = delegates[failoverIndexes[local]];
-
-
+           
       // Redirect connection routine.
       // Verify the failureMap on the server and if out of sync find the correct delegate
       
-      int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
-      if (failoverNode!=delegateFound.getServerId())
-      {
-         delegateFound = null;
-         for (int i = 0; i < delegates.length; i++)
-         {
-            if (delegates[i].getServerId() == failoverNode)
-            {
-               delegateFound = delegates[i];
-            }
-         }
+      
+      //THIS IS WRONG - cannot use getFailoverNode method since failover node might change
+      //between getting the result and actually failing over
+      
+//      int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
+//      if (failoverNode!=delegateFound.getServerId())
+//      {
+//         delegateFound = null;
+//         for (int i = 0; i < delegates.length; i++)
+//         {
+//            if (delegates[i].getServerId() == failoverNode)
+//            {
+//               delegateFound = delegates[i];
+//            }
+//         }
+//
+//         if (delegateFound==null)
+//         {
+//            throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
+//         }
+//
+//      }
 
-         if (delegateFound==null)
-         {
-            throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
-         }
-
-      }
-
-      return delegateFound;
-
+      return del;
    }
    
    private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,9 +21,9 @@
  */
 package org.jboss.jms.client.delegate;
 
-import org.jboss.aop.util.PayloadKey;
+import java.util.Map;
+
 import org.jboss.jms.server.Version;
-import org.jboss.jms.server.remoting.MetaDataConstants;
 
 /**
  * A ClusteredClientConnectionFactoryDelegate
@@ -49,24 +49,21 @@
 
    // Attributes ----------------------------------------------------
 
-   /**
-    * If delegates[i] is the current delegate, then the failover delegate is given by
-    * delegates[failoverIndexes[i]]
-    */
    private ClientConnectionFactoryDelegate[] delegates;
 
-   private int[] failoverIndexes;
+   //Map <node Id, failover node id>
+   private Map failoverMap;
 
    // Constructors --------------------------------------------------
 
    public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
                                                    Version serverVersion, boolean clientPing,
                                                    ClientConnectionFactoryDelegate[] delegates,
-                                                   int[] failoverIndexes)
+                                                   Map failoverMap)
    {
       super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
       this.delegates = delegates;
-      this.failoverIndexes = failoverIndexes;
+      this.failoverMap = failoverMap;
    }
 
    // Some of the properties of ClientConnectionFactoryDelegate are not exposed..
@@ -74,10 +71,10 @@
    // So, I created this Constructor so I could have access into protected members inside an extension class
    public ClusteredClientConnectionFactoryDelegate(ClientConnectionFactoryDelegate mainDelegate,
                                                    ClientConnectionFactoryDelegate[] delegates,
-                                                   int[] failoverIndexes)
+                                                   Map failoverMap)
    {
       this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
-         mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverIndexes);
+         mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverMap);
    }
 
    // DelegateSupport overrides -------------------------------------
@@ -93,15 +90,17 @@
             delegates[i].init();
          }
       }
+      
+      //This doesn't seem to be used so I'm commenting it out
 
-      // We add this to the meta data so the failOver aspect can get access to it
-      getMetaData().addMetaData(MetaDataConstants.JMS,
-                                MetaDataConstants.CF_DELEGATES,
-                                delegates, PayloadKey.TRANSIENT);
-
-      getMetaData().addMetaData(MetaDataConstants.JMS,
-                                MetaDataConstants.CF_FAILOVER_INDEXES,
-                                failoverIndexes, PayloadKey.TRANSIENT);
+//      // We add this to the meta data so the failOver aspect can get access to it
+//      getMetaData().addMetaData(MetaDataConstants.JMS,
+//                                MetaDataConstants.CF_DELEGATES,
+//                                delegates, PayloadKey.TRANSIENT);
+//
+//      getMetaData().addMetaData(MetaDataConstants.JMS,
+//                                MetaDataConstants.FAILOVER_MAP,
+//                                failoverMap, PayloadKey.TRANSIENT);
    }
 
    // Public --------------------------------------------------------
@@ -113,11 +112,21 @@
       return delegates;
    }
 
-   /** As metadata is not working, I'm exposing this temporarily */
-   public int[] getFailoverIndexes()
+   /** TODO As metadata is not working, I'm exposing this temporarily */
+   public Map getFailoverMap()
    {
-      return failoverIndexes;
+      return failoverMap;
    }
+   
+   public void setFailoverMap(Map failoverMap)
+   {
+      this.failoverMap = failoverMap;
+   }
+   
+   public void setDelegates(ClientConnectionFactoryDelegate[] dels)
+   {
+      this.delegates = dels;
+   }
 
    public String toString()
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -22,13 +22,16 @@
 package org.jboss.jms.server.connectionfactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
+
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
@@ -40,8 +43,10 @@
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.jms.util.JNDIUtil;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.FailoverMapper;
 import org.jboss.messaging.core.plugin.contract.ReplicationListener;
 import org.jboss.messaging.core.plugin.contract.Replicator;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -75,6 +80,13 @@
    
    protected Replicator replicator;
    
+   /*
+   We cache the map of node->failover node in here.
+   This is then updated when node joins or leaves the cluster via the replicationListener
+   When new cfs are deployed we use the cached map
+   */
+   protected Map failoverMap;
+   
    // Constructors --------------------------------------------------
    
    public ConnectionFactoryJNDIMapper(ServerPeer serverPeer) throws Exception
@@ -107,6 +119,7 @@
       }
       
       int id = serverPeer.getNextObjectID();
+      
       Version version = serverPeer.getVersion();
 
       ServerConnectionFactoryEndpoint endpoint =
@@ -128,55 +141,46 @@
          log.info("ConnectionFactoryJNDIMapper is non clustered");
       }
       
-      /* if (clustered)
+      boolean creatingClustered = clustered && replicator != null;
+      
+      ClientConnectionFactoryDelegate localDelegate =
+         new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+                                             locatorURI, version, clientPing);
+      
+      /*
+       * When registering a new clustered connection factory i should first create it with the available delegates
+       * then send the replication message.
+       * We then listen for connection factories added to global state using the replication listener
+       * and then update their connection factory list.
+       * This will happen locally too, so we will get the replication message locally - to avoid updating it again
+       * we can ignore any "add" replication updates that originate from the current node.
+       */
+      
+      if (creatingClustered)
       {
-         setupReplicator();
+         //Replicate the change - we will ignore this locally
          
-         if (replicator != null)
-         {                                             
-            //Replicator might still be null since we might be deploying a clustered cf in a non clustered
-            //post office (which is ok)
-            delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
-         }
+         replicator.put(CF_PREFIX + uniqueName, localDelegate);
+         
+         //Create a clustered delegate
+         
+         Map localDelegates = replicator.get(CF_PREFIX + uniqueName);
+         
+         delegate = createClusteredDelegate(localDelegates);
+         
       }
+      else
+      {
+         delegate = localDelegate;
+      }
       
-      if (delegate == null)
-      {
-         //Local
-         delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
-                                        locatorURI, version, clientPing);
-      } */
-
-      delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
-                                     locatorURI, version, clientPing);
-
-
-      //delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
-
-
       log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
+      
       delegates.put(uniqueName, delegate);
 
-
-      // We are clustered, we need to propagate the local delegate across the cluster.
-
-      // ... and then update it (and the clustered delegate on every node, while we're at it) by
-      // replicating the local delegate across the cluster. This way, the clustred delegates on each
-      // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
-      // will rebind updated ConnectionFactories in JNDI.
-      // This will update the local node too
-
-      log.info("CF_PREFIX=" + CF_PREFIX + " uniqueName=" + uniqueName + " delegate = " + delegate + " replicator = " + replicator);
-      if (replicator != null)
-      {
-         replicator.put(CF_PREFIX + uniqueName, delegate);
-      }
-      else
-      {
-         //Now bind it in JNDI
-         rebindConnectionFactory(initialContext, jndiBindings, delegate);
-      }
-
+      //Now bind it in JNDI
+      rebindConnectionFactory(initialContext, jndiBindings, delegate);
+      
       //Registering with the dispatcher should always be the last thing otherwise a client could use
       //a partially initialised object
       JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
@@ -243,19 +247,6 @@
    {
       initialContext = new InitialContext();
       
-      /*
-
-      ConnectionFActoryJNDIMapper is started in a call of ServerPeer, while replicator is started later
-      when the postoffices are started. So, I'm keeping the registration of replicator lazy on the first connection
-      Allthough this is not a proper design, as we might loose messages when the PostOffice is connected
-
-      replicator = serverPeer.getDataReplicator();
-
-      if (replicator != null)
-      {
-         replicator.registerListener(this);
-      } */
-
       log.debug("started");
    }
    
@@ -273,40 +264,99 @@
    
    // ReplicationListener interface ----------------------------------
    
-   public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
+   public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap,
+                                                boolean added, int originatingNodeId)
    {
       log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap + " added=");
       try
-      {
-         // The list of connection factories across the cluster has changed, so we need to update
-         // the clustered connection factories in JNDI with new ones with the up to date list.
-         // Replication can be used for other stuff (not just connection factories) so we check
-         // the prefix.
+      {         
+         if (!(key instanceof String))
+         {
+            return;
+         }
          
          String sKey = (String)key;
-         
-         if (key instanceof String && sKey.startsWith(CF_PREFIX))
+
+         if (sKey.equals(DefaultClusteredPostOffice.ADDRESS_INFO_KEY))
          {
-            //We only need to rebind if the cf is being added
+           /* 
+            We respond to changes in the node-address mapping
+            This will be replicated whan a node joins / leaves the group
+            When this happens we need to recalculate the failoverMap
+            and rebind all connection factories with the new mapping
+            We cannot just reference a single map since the objects are bound in JNDI
+            in serialized form
+            */
+            log.info("responding to node - adress info change. Recalculating mapping and rebinding cfs");
             
+            recalculateFailoverMap(updatedReplicantMap);
+            
+            //rebind
+            Iterator iter = endpoints.entrySet().iterator();
+            
+            while (iter.hasNext())
+            {
+               Map.Entry entry = (Map.Entry)iter.next();
+               
+               String uniqueName = (String)entry.getKey();
+               
+               ServerConnectionFactoryEndpoint endpoint =
+                  (ServerConnectionFactoryEndpoint)entry.getValue();
+               
+               ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
+               
+               if (del == null)
+               {
+                  throw new IllegalStateException("Cannot find cf with name " + uniqueName);
+               }
+               
+               del.setFailoverMap(failoverMap);
+               
+               rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+            }
+            
+         }
+         else if (sKey.startsWith(CF_PREFIX) && originatingNodeId != serverPeer.getServerPeerID())
+         {
+            /*
+            A connection factory has been deployed / undeployed - we need to update the local delegate arrays inside the clustered
+            connection factories with the same name
+            We don't recalculate the failover map since the number of nodes in the group hasn't changed
+            We also ignore any local changes since the cf will already be bound locally with the new
+            local delegate in the array
+            */
+            
             String uniqueName = sKey.substring(CF_PREFIX.length());
             
-            ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
+            log.info("Connection factory with unique name " + uniqueName + " has been added / removed");
             
-            // Now rebind ...
+            ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
+            
+            if (del == null)
+            {
+               throw new IllegalStateException("Cannot find cf with name " + uniqueName);
+            }
+            
+            ClientConnectionFactoryDelegate[] delArr = 
+               (ClientConnectionFactoryDelegate[])updatedReplicantMap.values().toArray(new ClientConnectionFactoryDelegate[updatedReplicantMap.size()]);
 
+            log.info("Updating delsArr with size " + delArr.length);
+            
+            del.setDelegates(delArr);
+            
             ServerConnectionFactoryEndpoint endpoint =
                (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-
+            
             if (endpoint == null)
             {
-               throw new IllegalStateException("Cannot find endpoint " + uniqueName );
+               throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
             }
-
-            rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
-         }
+            
+            rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+            
+         }            
       }
-      catch (NamingException e)
+      catch (Exception e)
       {
          log.error("Failed to rebind connection factory", e);
       }
@@ -332,96 +382,63 @@
       this.serverPeer.getQueuePostOfficeInstance();
    }
 
-
+   private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
+   {     
+      List nodes = new ArrayList(nodeAddressMap.keySet());
+      
+      FailoverMapper mapper = replicator.getFailoverMapper();
+      
+      failoverMap = mapper.generateMapping(nodes);
+   }
+   
    /**
     * @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
     */
-   private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
+   private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates) throws Exception
    {
       //TODO: make it trace after the code is stable
       log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
 
-      // Calculates the failoverMap based on the current list of localDelegates
-      Map failoverMap = replicator.getFailoverMapper().generateMapping(localDelegates.keySet());
+      ClientConnectionFactoryDelegate[] delArr = 
+         (ClientConnectionFactoryDelegate[])localDelegates.values().toArray(new ClientConnectionFactoryDelegate[localDelegates.size()]);
 
-      int s = localDelegates.size();
-
-      int[] nodesIDs = new int[s];
-      int[] failoverNodeIDs = new int[s];
-      ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
-      // Ths created ClusteredClientFactoryDelegate will copy the properties from this delegate which is
-      // the delegate on this current serverPeer
-      ClientConnectionFactoryDelegate mainDelegate = null;
-
-      int idx = 0;
-
-      for(Iterator i = localDelegates.entrySet().iterator(); i.hasNext();)
+      //If the map is not cached - generate it now
+      
+      if (failoverMap == null)
       {
-         Map.Entry entry = (Map.Entry)i.next();
-
-         Integer nodeID = (Integer)entry.getKey();
-         Integer failoverNodeID = (Integer)failoverMap.get(nodeID); 
-
-         log.trace("setFailoverDelegates inside forIterator::nodeID=" + nodeID + " failoverNodeID=" + failoverNodeID);
-
-         nodesIDs[idx] = nodeID.intValue();
-         failoverNodeIDs[idx] = failoverNodeID.intValue();
-         delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
-
-         if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
+         Map nodeAddressMap = replicator.get(DefaultClusteredPostOffice.ADDRESS_INFO_KEY);
+         
+         if (nodeAddressMap == null)
          {
-            // sanity check
-            if (mainDelegate != null)
-            {
-               throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
-            }
-            mainDelegate = delegates[idx];
+            throw new IllegalStateException("Cannot find address node mapping!");
          }
-
-         idx++;
+         
+         recalculateFailoverMap(nodeAddressMap);
       }
-      // Generate the failover indexes. This could probably be optimised if need be.
       
-      //MainDelegate would be null in the case the cf is being undeployed locally
+      //Now we find the "mainDelegate"
+      //TODO - why do we need a "mainDelegate" ????
       
-      ClusteredClientConnectionFactoryDelegate clusteredDelegate = null;
+      ClientConnectionFactoryDelegate mainDelegate = null;
       
-      if (mainDelegate != null)
-      {         
-         int[] failoverIndexes = new int[s];
-         
-         for (int i = 0; i < s; i++)
-         {
-            int failoverNode = failoverNodeIDs[i];
-            int failoverIndex = -1;
+      for(Iterator i = localDelegates.values().iterator(); i.hasNext();)
+      {
+         ClientConnectionFactoryDelegate del = (ClientConnectionFactoryDelegate)i.next();
             
-            for (int j = 0; j < s; j++)
-            {
-               if (nodesIDs[j] == failoverNode)
-               {
-                  failoverIndex = j;
-                  break;
-               }
-            }
+          if (del.getServerId() == this.serverPeer.getServerPeerID())
+          {
+             // sanity check
+             if (mainDelegate != null)
+             {
+                throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
+             }
+             mainDelegate = del;
+          }
+      }
             
-            if (failoverIndex == -1)
-            {
-               //throw new IllegalStateException("Failover node " + failoverNode + " is not in list of nodes at node " + serverPeer.getServerPeerID() + "!");
-               
-               //It is possible that the failover node is not found  - this might happen
-               //if this is executed when a cf is undeployed but the failover node still contains
-               //the old node.
-               
-               //In this case the node leave event will shortly
-            }
-            
-            failoverIndexes[i] = failoverIndex;
-         }
-   
-         clusteredDelegate =
-            new ClusteredClientConnectionFactoryDelegate(mainDelegate, delegates, failoverIndexes);
-      }
-
+      ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+         new ClusteredClientConnectionFactoryDelegate(mainDelegate, delArr, failoverMap);
+      
       return clusteredDelegate;
    }
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -53,7 +53,7 @@
    
    public static final String REMOTING_CONNECTION = "REMOTING_CONNECTION";
    
-   public static final String CF_FAILOVER_INDEXES = "CF_FAIL_IND";
+   public static final String FAILOVER_MAP = "CF_FAIL_IND";
    
    public static final String CONNECTION_VERSION = "CONNECTION_VERSION";
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,7 +21,7 @@
  */
 package org.jboss.messaging.core.plugin.contract;
 
-import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -37,12 +37,5 @@
  */
 public interface FailoverMapper
 {
-   /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic.
-    *
-    * An implementation of this method should aways sort received nodes first before calculating as the parameter list
-    * might be in a different order, and it should aways return the same result for the same set of nodes whatever is
-    * the order of parameter nodes
-    * */
-   Map generateMapping(Collection nodes);
-      
+   Map generateMapping(List nodes);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -39,5 +39,5 @@
     * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
     *        the given key.
     */
-   void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added);
+   void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added, int originatingNodeId);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -67,22 +67,8 @@
    
    void unregisterListener(ReplicationListener listener);
    
-   /**
-    * TODO - this method doesn't belong here, as no semantics should be associated with data at this
-    *        level
-    *
-    * Gets the current failover node ID for the specified node ID.
-    *
-    * @param nodeID - The node to get the failover node id for.
-    *
-    * @return the failover node ID. If there is no failover node (one-node cluster), the method
-    *         returns the original nodeID. 
-    */
-   //int getFailoverNodeID(int nodeID);
-
-   /** TODO - this method doesn't belong here... We should have POJOTized containers updating dependencies
+   /** TODO - this method doesn't belong here... We should have POJOized containers updating dependencies
     *         between ConnectionFActoryJNDIMapper and DefaultClusteredPostOffice */
    FailoverMapper getFailoverMapper();
 
-
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -93,7 +93,7 @@
    private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
    
    // Key for looking up node id -> address info mapping from replicated data
-   private static final String ADDRESS_INFO_KEY = "address_info";
+   public static final String ADDRESS_INFO_KEY = "address_info";
    
    //Used for failure testing
    private boolean failBeforeCommit;
@@ -722,7 +722,7 @@
          
          m.put(new Integer(originatorNodeID), replicant);
          
-         notifyListeners(key, m, true);
+         notifyListeners(key, m, true, originatorNodeID);
       }
    }
 
@@ -753,7 +753,7 @@
          {
             replicatedData.remove(key);
          } 
-         notifyListeners(key, m, false);
+         notifyListeners(key, m, false, originatorNodeID);
       
          return true; 
       }
@@ -1821,15 +1821,15 @@
    /*
     * Removes all non durable binding data, and any local replicant data for the specified node.
     */
-   private void removeDataForNode(Integer nodeID) throws Exception
+   private void removeDataForNode(Integer nodeToRemove) throws Exception
    {
-      log.info("Node " + nodeID + " requested to leave cluster");
+      log.info("Node " + nodeToRemove + " requested to leave cluster");
       
       lock.writeLock().acquire();
 
       try
       {          
-         Map nameMap = (Map)nameMaps.get(nodeID);
+         Map nameMap = (Map)nameMaps.get(nodeToRemove);
 
          if (nameMap != null)
          {
@@ -1855,7 +1855,7 @@
             {
                Binding binding = (Binding)iter.next();
                
-               removeBinding(nodeID.intValue(), binding.getQueue().getName());
+               removeBinding(nodeToRemove.intValue(), binding.getQueue().getName());
             }
          }         
       }
@@ -1874,7 +1874,7 @@
             String key = (String)entry.getKey();
             Map replicants = (Map)entry.getValue();
 
-            replicants.remove(nodeID);
+            replicants.remove(nodeToRemove);
 
             if (replicants.isEmpty())
             {
@@ -1882,7 +1882,7 @@
             }
 
             // Need to trigger listeners
-            notifyListeners(key, replicants, false);
+            notifyListeners(key, replicants, false, nodeToRemove.intValue());
          }
       }
    }
@@ -1891,7 +1891,8 @@
     * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
     *        the given key.
     */
-   private void notifyListeners(Serializable key, Map updatedReplicantMap, boolean added)
+   private void notifyListeners(Serializable key, Map updatedReplicantMap, boolean added,
+                                int originatorNodeId)
    { 
       synchronized (replicationListeners)
       {         
@@ -1899,7 +1900,7 @@
          {
             ReplicationListener listener = (ReplicationListener)i.next();
             
-            listener.onReplicationChange(key, updatedReplicantMap, added);
+            listener.onReplicationChange(key, updatedReplicantMap, added, originatorNodeId);
          }
       }
    } 
@@ -2115,7 +2116,7 @@
          
          Map replicants = (Map)entry.getValue();
          
-         Map m = new HashMap();
+         Map m = new LinkedHashMap();
          
          m.putAll(replicants);
          
@@ -2461,7 +2462,8 @@
    private class NodeAddressMapListener implements ReplicationListener
    {
 
-      public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
+      public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added,
+                                      int originatorNodeId)
       {
          if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
          {
@@ -2484,8 +2486,7 @@
       
       private void generateFailoverMap(Map nodeAddressMap)
       {
-
-         failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
+         failoverMap = failoverMapper.generateMapping(new ArrayList(nodeAddressMap.keySet()));
       }      
    }
 }
\ No newline at end of file

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,10 +21,11 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.contract.FailoverMapper;
 
@@ -45,20 +46,20 @@
    private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
 
    /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
-   public Map generateMapping(Collection nodes)
+   public Map generateMapping(List nodes)
    {
-
-      Object[] arrayNodes = (Object[])nodes.toArray(new Integer[nodes.size()]);
-      Arrays.sort(arrayNodes);
-
-      int s = arrayNodes.length;
+      if (!(nodes instanceof ArrayList))
+      {
+         //Convert to array list for fast index access
+         nodes = new ArrayList(nodes);
+      }
       
-      log.info("Genertaing failover mapping, node size="+ s);
+      int s = nodes.size();
+      
+      log.info("Generating failover mapping, node size= "+ s);
 
-
-
-
-      Map failoverNodes = new LinkedHashMap(s);
+      //There is no need for the map to be linked
+      Map failoverNodes = new HashMap(s);
       
       for (int i = 0; i < s; i++)
       {
@@ -69,7 +70,7 @@
             j = 0;
          }
          
-         failoverNodes.put(arrayNodes[i], arrayNodes[j]);
+         failoverNodes.put(nodes.get(i), nodes.get(j));
       }
       
       return failoverNodes;

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-09 14:40:30 UTC (rev 1734)
@@ -22,6 +22,8 @@
 
 package org.jboss.test.messaging.jms.clustering;
 
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
@@ -187,6 +189,23 @@
       
    }
  
+   public void testStartStopProblem() throws Exception
+   {
+      //Three nodes should be running now
+      
+      //Stop one of them
+      
+      ServerManagement.stop(0, true);
+      
+      //Stop another
+      
+      ServerManagement.stop(1, true);
+      
+      
+      
+   }
+   
+   
    /*
     * Test that the failover mapping is created correctly
     */
@@ -207,67 +226,89 @@
       
       ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
       
+      //The order here depends on the order the servers were started in
+      
+      log.info("cf1 serverid=" + cf1.getServerId());
+      
+      log.info("cf2 serverid=" + cf2.getServerId());
+      
+      log.info("cf3 serverid=" + cf3.getServerId());
+      
+      
       assertEquals(0, cf1.getServerId());
       
       assertEquals(1, cf2.getServerId());
       
       assertEquals(2, cf3.getServerId());
       
-      int[] failoverIndexes = delegate.getFailoverIndexes();
+      Map failoverMap = delegate.getFailoverMap();
       
       assertEquals(3, delegates.length);
       
-      assertEquals(3, failoverIndexes.length);
+      assertEquals(3, failoverMap.size());
       
       // Default failover policy just chooses the node to the right
       
-      log.info("failoverindex[0]:" + failoverIndexes[0]);
-      log.info("failoverindex[1]:" + failoverIndexes[1]);
-      log.info("failoverindex[2]:" + failoverIndexes[2]);
+      assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
       
-      assertEquals(1, failoverIndexes[0]);
+      assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
       
-      assertEquals(2, failoverIndexes[1]);
+      assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
       
-      assertEquals(0, failoverIndexes[2]);
+      assertEquals(2, ((Integer)failoverMap.get(new Integer(1))).intValue());
       
       //Now cleanly stop one of the servers
       
+      
+      log.info("************** STOPPING SERVER 0");
       ServerManagement.stop(0, true);
       
+      log.info("server stopped");
+      
       assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
       
       //Lookup another connection factory
       
       JBossConnectionFactory factory2 =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
       
+      log.info("Got connection factory");
+      
       ClusteredClientConnectionFactoryDelegate delegate2 =
          (ClusteredClientConnectionFactoryDelegate)factory2.getDelegate();
       
       ClientConnectionFactoryDelegate[] delegates2 = delegate2.getDelegates();
       
-      int[] failoverIndexes2 = delegate2.getFailoverIndexes();
+      Map failoverMap2 = delegate2.getFailoverMap();
       
+      log.info("Got failover map");
+      
       assertEquals(2, delegates2.length);
       
       cf1 = delegate2.getDelegates()[0];
       
       cf2 = delegate2.getDelegates()[1];
       
-      assertEquals(0, cf1.getServerId());
+      //Order here depends on order servers were started in
       
-      assertEquals(1, cf2.getServerId());
+      log.info("cf1 serverid=" + cf1.getServerId());
       
+      log.info("cf2 serverid=" + cf2.getServerId());
       
+      assertEquals(1, cf1.getServerId());
       
-      assertEquals(2, failoverIndexes2.length);
+      assertEquals(2, cf2.getServerId());
       
-      assertEquals(1, failoverIndexes2[0]);
       
-      assertEquals(0, failoverIndexes2[1]);
+      assertEquals(2, failoverMap2.size());
       
+      assertEquals(cf2.getServerId(), ((Integer)failoverMap2.get(new Integer(cf1.getServerId()))).intValue());
+      
+      assertEquals(cf1.getServerId(), ((Integer)failoverMap2.get(new Integer(cf2.getServerId()))).intValue());
+      
       //Cleanly stop another server
       
+      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+      
       ServerManagement.stop(1, true);
       
       assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
@@ -281,7 +322,7 @@
       
       ClientConnectionFactoryDelegate[] delegates3 = delegate3.getDelegates();
       
-      int[] failoverIndexes3 = delegate3.getFailoverIndexes();
+      Map failoverMap3 = delegate3.getFailoverMap();
       
       assertEquals(1, delegates3.length);
       
@@ -290,11 +331,10 @@
       assertEquals(0, cf1.getServerId());
       
       
-      assertEquals(1, failoverIndexes3.length);
+      assertEquals(1, failoverMap3.size());
       
-      assertEquals(0, failoverIndexes3[0]);
-      
-      
+      assertEquals(cf1.getServerId(), ((Integer)failoverMap3.get(new Integer(cf1.getServerId()))).intValue());
+            
       //TODO - Add nodes back into the cluster - test framework currently does not support this
       
       




More information about the jboss-cvs-commits mailing list