[jboss-cvs] JBoss Messaging SVN: r1733 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 8 20:15:24 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-08 20:15:18 -0500 (Fri, 08 Dec 2006)
New Revision: 1733

Added:
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
Log:
Changing how the failoverMapping is being calculated

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -275,7 +275,7 @@
    
    public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
    {
-      log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap);
+      log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap + " added=");
       try
       {
          // The list of connection factories across the cluster has changed, so we need to update
@@ -285,7 +285,7 @@
          
          String sKey = (String)key;
          
-         if (added && key instanceof String && sKey.startsWith(CF_PREFIX))
+         if (key instanceof String && sKey.startsWith(CF_PREFIX))
          {
             //We only need to rebind if the cf is being added
             
@@ -293,21 +293,17 @@
             
             ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
             
-            //It could be null if the cf is being undeployed
-            if (clusteredDelegate != null)
-            {               
-               // Now rebind ...
-               
-               ServerConnectionFactoryEndpoint endpoint =
-                  (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-               
-               if (endpoint == null)
-               {
-                  throw new IllegalStateException("Cannot find endpoint " + uniqueName );
-               }
-   
-               rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
+            // Now rebind ...
+
+            ServerConnectionFactoryEndpoint endpoint =
+               (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
+
+            if (endpoint == null)
+            {
+               throw new IllegalStateException("Cannot find endpoint " + uniqueName );
             }
+
+            rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
          }
       }
       catch (NamingException e)
@@ -345,6 +341,9 @@
       //TODO: make it trace after the code is stable
       log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
 
+      // Calculates the failoverMap based on the current list of localDelegates
+      Map failoverMap = replicator.getFailoverMapper().generateMapping(localDelegates.keySet());
+
       int s = localDelegates.size();
 
       int[] nodesIDs = new int[s];
@@ -360,13 +359,13 @@
       {
          Map.Entry entry = (Map.Entry)i.next();
 
-         int nodeID = ((Integer)entry.getKey()).intValue();
-         int failoverNodeID = replicator.getFailoverNodeID(nodeID);
+         Integer nodeID = (Integer)entry.getKey();
+         Integer failoverNodeID = (Integer)failoverMap.get(nodeID); 
 
          log.trace("setFailoverDelegates inside forIterator::nodeID=" + nodeID + " failoverNodeID=" + failoverNodeID);
 
-         nodesIDs[idx] = nodeID;
-         failoverNodeIDs[idx] = failoverNodeID;
+         nodesIDs[idx] = nodeID.intValue();
+         failoverNodeIDs[idx] = failoverNodeID.intValue();
          delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
 
          if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
@@ -374,15 +373,13 @@
             // sanity check
             if (mainDelegate != null)
             {
-               throw new IllegalStateException("There are two server with serverID, verify your clustering configuration");
+               throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
             }
             mainDelegate = delegates[idx];
          }
 
          idx++;
       }
-      // sanity check
-      
       // Generate the failover indexes. This could probably be optimised if need be.
       
       //MainDelegate would be null in the case the cf is being undeployed locally

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -142,8 +142,11 @@
    {
       try
       {
-         ServerPeer peer = (ServerPeer )serverPeer.getInstance();
-         return peer.getReplicator().getFailoverNodeID(node);
+         //ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+         //return peer.getReplicator().getFailoverNodeID(node);
+
+         // not implemented yet
+         return node;
       }
       catch (Throwable t)
       {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -21,7 +21,8 @@
  */
 package org.jboss.messaging.core.plugin.contract;
 
-import java.util.List;
+import java.util.Collection;
+import java.util.Map;
 
 /**
  * A FailoverMapper
@@ -36,6 +37,12 @@
  */
 public interface FailoverMapper
 {
-   List generateMapping(List nodes);
+   /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic.
+    *
+    * An implementation of this method should aways sort received nodes first before calculating as the parameter list
+    * might be in a different order, and it should aways return the same result for the same set of nodes whatever is
+    * the order of parameter nodes
+    * */
+   Map generateMapping(Collection nodes);
       
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -78,7 +78,11 @@
     * @return the failover node ID. If there is no failover node (one-node cluster), the method
     *         returns the original nodeID. 
     */
-   int getFailoverNodeID(int nodeID);
+   //int getFailoverNodeID(int nodeID);
 
+   /** TODO - this method doesn't belong here... We should have POJOTized containers updating dependencies
+    *         between ConnectionFActoryJNDIMapper and DefaultClusteredPostOffice */
+   FailoverMapper getFailoverMapper();
 
+
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -626,6 +626,11 @@
       }
    }
 
+   public FailoverMapper getFailoverMapper()
+   {
+      return failoverMapper;
+   }
+
    // Replicator implementation --------------------------------------------------------------------------
    
    public Map get(Serializable key) throws Exception
@@ -1858,24 +1863,24 @@
       {
          lock.writeLock().release();
       }
-         
+
       synchronized (replicatedData)
-      {         
+      {
          // We need to remove any replicant data for the node. This includes the node-address info.
          for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
          {
             Map.Entry entry = (Map.Entry)i.next();
-            
+
             String key = (String)entry.getKey();
             Map replicants = (Map)entry.getValue();
-            
+
             replicants.remove(nodeID);
-            
+
             if (replicants.isEmpty())
             {
                i.remove();
-            }     
-            
+            }
+
             // Need to trigger listeners
             notifyListeners(key, replicants, false);
          }
@@ -2479,34 +2484,8 @@
       
       private void generateFailoverMap(Map nodeAddressMap)
       {
-         List nodes = new ArrayList(nodeAddressMap.keySet());
-         
-         log.info("generating failover map");
-         
-         log.info("I have " + nodes.size() + " nodes");
-                                
-         List failoverNodes = failoverMapper.generateMapping(nodes);
-         
-         log.info("I generated " + failoverNodes.size() +" failover nodes");
-         
-         // Now put this in the map of node -> failover node
-         
-         synchronized (failoverMap)
-         {            
-            failoverMap.clear();
-   
-            Iterator iter = nodes.iterator();
-            Iterator iter2 = failoverNodes.iterator();
-   
-            while (iter.hasNext())
-            {
-               Integer node = (Integer)iter.next();
-   
-               Integer failoverNode = (Integer)iter2.next();
-   
-               failoverMap.put(node, failoverNode);
-            }
-         }
+
+         failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
       }      
    }
 }
\ No newline at end of file

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -21,9 +21,10 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.contract.FailoverMapper;
 
@@ -42,21 +43,23 @@
 public class DefaultFailoverMapper implements FailoverMapper
 {
    private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
-   
-   public List generateMapping(List nodes)
+
+   /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
+   public Map generateMapping(Collection nodes)
    {
-      int s = nodes.size();
+
+      Object[] arrayNodes = (Object[])nodes.toArray(new Integer[nodes.size()]);
+      Arrays.sort(arrayNodes);
+
+      int s = arrayNodes.length;
       
       log.info("Genertaing failover mapping, node size="+ s);
-            
-      List failoverNodes = new ArrayList(s);
+
+
+
+
+      Map failoverNodes = new LinkedHashMap(s);
       
-      if (!(nodes instanceof ArrayList))
-      {
-         //So we can ensure fast index based access
-         nodes = new ArrayList(nodes);
-      }
-            
       for (int i = 0; i < s; i++)
       {
          int j = i + 1;
@@ -66,13 +69,9 @@
             j = 0;
          }
          
-         Object failoverNode = nodes.get(j);
-         
-         failoverNodes.add(failoverNode);
+         failoverNodes.put(arrayNodes[i], arrayNodes[j]);
       }
       
-      log.info("Returning " + failoverNodes.size() + " nodes");
-      
       return failoverNodes;
    }
 

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java	2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java	2006-12-09 01:15:18 UTC (rev 1733)
@@ -0,0 +1,57 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class FailoverMapperTest extends TestCase
+{
+
+   public void testMapper()
+   {
+      ArrayList list = new ArrayList();
+
+      list.add(new Integer(50));
+      list.add(new Integer(25));
+      list.add(new Integer(15));
+
+      DefaultFailoverMapper mapper = new DefaultFailoverMapper();
+      Map map = mapper.generateMapping(list);
+
+      assertEquals(new Integer(15),map.get(new Integer(50)));
+      assertEquals(new Integer(50),map.get(new Integer(25)));
+      assertEquals(new Integer(25),map.get(new Integer(15)));
+      
+   }
+
+
+}




More information about the jboss-cvs-commits mailing list