[jboss-cvs] JBossAS SVN: r57644 - projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 12 23:34:37 EDT 2006


Author: bstansberry at jboss.com
Date: 2006-10-12 23:34:36 -0400 (Thu, 12 Oct 2006)
New Revision: 57644

Added:
   projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java
Log:
Add abstract impl of DRM

Added: projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java
===================================================================
--- projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java	2006-10-13 03:33:34 UTC (rev 57643)
+++ projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java	2006-10-13 03:34:36 UTC (rev 57644)
@@ -0,0 +1,1085 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.server.impl;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import org.jboss.ha.server.spi.DistributedReplicantManager;
+import org.jboss.logging.Logger;
+
+/**
+ * Base implementation of DistributedReplicantManager.
+ * 
+ * @author  <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
+ * @version  $Revision: 1.1 $
+ */
+public abstract class AbstractDistributedReplicantManager 
+   implements DistributedReplicantManager, AsynchEventHandler.AsynchEventProcessor
+{
+   // Constants -----------------------------------------------------
+   
+   protected final static String SERVICE_NAME = "DistributedReplicantManager";
+   
+   // Attributes ----------------------------------------------------
+   protected static int threadID;
+   
+   protected Map localReplicants = new ConcurrentReaderHashMap();
+   protected Map replicants = new ConcurrentReaderHashMap();
+   protected Map keyListeners = new ConcurrentReaderHashMap();
+   protected Map intraviewIdCache = new HashMap();
+   
+   /** The handler used to send replicant change notifications asynchronously */
+   protected AsynchEventHandler asynchHandler;  
+   
+   protected Logger log = Logger.getLogger(getClass().getName());
+   
+   protected String[] nodeNames;
+   
+   protected Latch nodeNameKnown = new Latch();
+
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------       
+   
+   /**
+    * Default constructor.
+    */
+   public AbstractDistributedReplicantManager()
+   {
+   }
+   
+   // Abstract Methods
+
+   /** Get the name for this node that should be used in DRM entries */
+   protected abstract String getNodeName();
+   
+   /**
+    * Extracts the list of node names from the given list of the current cluster members.
+    * These names will be the ones used in DRM entries.
+    * 
+    * @param view list of cluster members. Type of the list elements is implementation dependent.
+    * 
+    * @return the String names of the cluster members, in the same order as <code>view</code>.
+    */
+   protected abstract String[] extractNodeNames(List view);
+   
+   /**
+    * Make an RPC to the other cluster members adding the given elements. The RPC
+    * should result in {@link #addFromCluster(String, String, Serializable)}
+    * being called on the remote nodes.
+    * 
+    * @param key       the replicant's name
+    * @param nodeName  the name the node adding the replicant
+    * @param replicant this node's replicant for the key
+    * @param async     <code>true</code> if the RPC call should be asynchronous
+    */
+   protected abstract void remoteAdd(String key, String nodeName, Serializable replicant, boolean async);
+   
+   /**
+    * Make an RPC to the other cluster members removing the given key. The RPC
+    * should result in {@link #removeFromCluster(String, String)}
+    * being called on the remote nodes.
+    * 
+    * @param key       the replicant's name
+    * @param nodeName  the name the node adding the replicant
+    * @param async     <code>true</code> if the RPC call should be asynchronous
+    */
+   protected abstract void remoteRemove(String key, String nodeName, boolean async);
+   
+   /**
+    *  Make an RPC to the other cluster members getting the local replicant
+    *  data from each.  The RPC should result in {@link #lookupLocalReplicants()} 
+    *  being called on the remote nodes.
+    * 
+    * @return <code>List</code> of RPC responses, each element is the response
+    *         for a given node.  A response may be <code>null</code>, a 
+    *         <code>Throwable</code> or the normal return value from
+    *         {@link #lookupLocalReplicants()}.
+    */
+   protected abstract List remoteLookupReplicants();
+   
+   // Service lifecycle --------------------------------------------------------
+   
+   public void create() throws Exception
+   {
+      
+   }
+   
+   public void start() throws Exception
+   {
+      // Create the asynch listener handler thread
+      asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
+      asynchHandler.start();
+
+      if (getNodeName() != null)
+      {   
+         nodeNameKnown.release (); // partition name is now known!
+      }
+      else
+      {
+         throw new IllegalStateException("Cannot determine node name");
+      }
+   }
+   
+   public void stop() throws Exception
+   {
+      // BES 200604 -- implication of NR's JBLCUSTER-38 change.  Moving to
+      // destroy allows restart of HAPartition while local registrations 
+      // survive -- stopping partition does not stop all registered services
+      // e.g. ejbs; if we maintain their registrations we can pass them to
+      // the cluster when we restart.  However, we are leaving all the remote
+      // replicants we have registered around, so they will still be included 
+      // as targets if anyone contacts our EJB while partition is stopped.
+      // Probably OK; if they aren't valid the client will find this out.
+      
+//    NR 200505 : [JBCLUSTER-38] move to destroy
+//      if (localReplicants != null)
+//      {
+//         synchronized(localReplicants)
+//         {
+//            while (! localReplicants.isEmpty ())
+//            {               
+//               this.remove ((String)localReplicants.keySet().iterator().next ());
+//            }
+//         }
+//      }
+      
+      // Stop the asynch handler thread
+      try
+      {
+         asynchHandler.stop();
+      }
+      catch( Exception e)
+      {
+         log.warn("Failed to stop asynchHandler", e);
+      }
+   }
+
+   // NR 200505 : [JBCLUSTER-38] unbind at destroy
+   public void destroy() throws Exception
+   {  
+      // now partition can't be resuscitated, so remove local replicants
+      if (localReplicants != null)
+      {
+         synchronized(localReplicants)
+         {
+            String[] keys = new String[localReplicants.size()];
+            localReplicants.keySet().toArray(keys);
+            for(int n = 0; n < keys.length; n ++)
+            {               
+               this.removeLocal(keys[n]); // channel is disconnected, so
+                                          // don't try to notify cluster
+            }
+         }
+      }
+   }
+   
+   // DistributedReplicantManager implementation ----------------------------------------------              
+   
+   public void add(String key, Serializable replicant) throws Exception
+   {
+      if(log.isTraceEnabled())
+         log.trace("add, key="+key+", value="+replicant);
+      nodeNameKnown.acquire (); // we don't propagate until our name is known
+      
+      remoteAdd(key, getNodeName(), replicant, false);
+      synchronized(localReplicants)
+      {
+         localReplicants.put(key, replicant);
+         notifyKeyListeners(key, lookupReplicants(key));
+      }
+   }
+   
+   public void remove(String key) throws Exception
+   {      
+      nodeNameKnown.acquire (); // we don't propagate until our name is known
+      
+      // optimisation: we don't make a costly network call
+      // if there is nothing to remove
+      if (localReplicants.containsKey(key))
+      {
+         remoteRemove(key, getNodeName(),true);
+         removeLocal(key);
+      }
+   }
+      
+   public Collection getAllServices ()
+   {
+      HashSet services = new HashSet();
+      services.addAll (localReplicants.keySet ());
+      services.addAll (replicants.keySet ());      
+      return services;
+   }
+   
+   public Serializable lookupLocalReplicant(String key)
+   {
+      return (Serializable)localReplicants.get(key);
+   }
+   
+   public List lookupReplicants(String key)
+   {
+      Serializable local = lookupLocalReplicant(key);
+      HashMap replicant = (HashMap)replicants.get(key);
+      if (replicant == null && local == null)
+         return null;
+
+      ArrayList rtn = new ArrayList();
+
+      if (replicant == null)
+      {
+         if (local != null)
+            rtn.add(local);
+      }
+      else 
+      {
+         // JBAS-2677. Put the replicants in view order.
+         Object replVal;
+         String ourName = getNodeName();
+         for (int i = 0; i < nodeNames.length; i++)
+         {
+            if (local != null && ourName.equals(nodeNames[i]))
+            {
+               rtn.add(local);
+               continue;
+            }
+            
+            replVal = replicant.get(nodeNames[i]);
+            if (replVal != null)
+               rtn.add(replVal);            
+         }
+      }
+      
+      return rtn;
+   }
+   
+   public List lookupReplicantsNodeNames(String key)
+   {      
+      boolean locallyReplicated = localReplicants.containsKey (key);
+      HashMap replicant = (HashMap)replicants.get(key);
+      if (replicant == null && !locallyReplicated)
+         return null;
+
+      String ourName = getNodeName();
+      ArrayList rtn = new ArrayList();
+      
+      if (replicant == null)
+      {   
+         if (locallyReplicated)
+            rtn.add(ourName);
+      }
+      else
+      {
+         // JBAS-2677. Put the replicants in view order.
+         Set keys = replicant.keySet();
+         for (int i = 0; i < nodeNames.length; i++)
+         {
+            if (locallyReplicated && ourName.equals(nodeNames[i]))
+            {
+               rtn.add(ourName);
+               continue;
+            }
+            
+            if (keys.contains(nodeNames[i]))
+               rtn.add(nodeNames[i]);            
+         }
+      }
+      
+      return rtn;
+   }
+   
+   public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+   {
+      synchronized(keyListeners)
+      {
+         ArrayList listeners = (ArrayList)keyListeners.get(key);
+         if (listeners == null)
+         {
+            listeners = new ArrayList();
+            keyListeners.put(key, listeners);
+         }
+         listeners.add(subscriber);
+      }
+   }
+   
+   public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+   {
+      synchronized(keyListeners)
+      {
+         ArrayList listeners = (ArrayList)keyListeners.get (key);
+         if (listeners == null) return;
+         
+         listeners.remove(subscriber);
+         if (listeners.size() == 0)
+            keyListeners.remove(key);
+
+      }
+   }
+   
+   public int getReplicantsViewId(String key)   
+   {
+      Integer result = (Integer)this.intraviewIdCache.get (key);
+      
+      if (result == null)
+         return 0;
+      else
+         return result.intValue ();      
+   }
+   
+   public boolean isMasterReplica (String key)
+   {
+      boolean trace = log.isTraceEnabled();
+      if(trace)
+         log.trace("isMasterReplica, key="+key);
+      // if I am not a replicat, I cannot be the master...
+      //
+      if (!localReplicants.containsKey (key))
+      {
+         if( trace )
+            log.trace("no localReplicants, key="+key+", isMasterReplica=false");
+         return false;
+      }
+
+      HashMap repForKey = (HashMap)replicants.get(key);
+      if (repForKey==null)
+      {
+         if( trace )
+            log.trace("no replicants, key="+key+", isMasterReplica=true");
+         return true;
+      }
+      Vector replicaNodes = new Vector (repForKey.keySet());
+      String ourName = getNodeName();
+      boolean isMasterReplica = false;
+      for (int i=0; i< nodeNames.length; i++)
+      {
+         if( trace )
+            log.trace("Testing member: " + nodeNames[i]);
+         if (replicaNodes.contains (nodeNames[i]))
+         {
+            if( trace )
+               log.trace("Member found in replicantNodes, isMasterReplica=false");
+            break;
+         }
+         else if (nodeNames[i].equals(ourName))
+         {
+            if( trace )
+               log.trace("Member == nodeName, isMasterReplica=true");
+            isMasterReplica = true;
+            break;
+         }
+      }
+      return isMasterReplica;
+   }
+   
+   // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
+   
+   public Serializable getCurrentState ()
+   {
+      java.util.Collection services = this.getAllServices ();
+      HashMap result = new HashMap ();
+      
+      java.util.Iterator catsIter = services.iterator ();                       
+      while (catsIter.hasNext ())
+      {
+         String category = (String)catsIter.next ();
+         HashMap content = (HashMap)this.replicants.get (category);
+         if (content == null)
+            content = new HashMap ();
+         else
+            content = (HashMap)content.clone ();
+         
+         Serializable local = lookupLocalReplicant(category);
+         if (local != null)
+            content.put (getNodeName(), local);
+         
+         result.put (category, content);
+      }
+      
+      // we add the intraviewid cache to the global result
+      //
+      Object[] globalResult = new Object[] {result, intraviewIdCache};
+      return globalResult;
+   }
+
+   public void setCurrentState(Serializable newState)
+   {
+      Object[] globalState = (Object[])newState;
+      
+      HashMap map = (HashMap)globalState[0];
+      this.replicants.putAll(map);
+      this.intraviewIdCache = (HashMap)globalState[1];
+
+      if(log.isTraceEnabled())
+      {
+         log.trace(getNodeName() + ": received new state, will republish local replicants");
+      }
+      MembersPublisher publisher = new MembersPublisher();
+      publisher.start();
+   }
+   
+   // HAPartition.HAMembershipListener implementation ----------------------------------------------
+
+   public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups)
+   {
+      membershipChanged(deadMembers, newMembers, allMembers);
+      
+      log.debug("Merging partitions...");
+      log.debug("Originating groups: " + originatingGroups);
+      if (newMembers.size() > 0) 
+      {
+         new MergeMembers().start();
+      }
+   }
+   
+   public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+   {
+      // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
+      // and then notify all listening nodes.
+      if (log.isDebugEnabled())
+      {
+         log.debug("I am (" + getNodeName() + ") received membershipChanged event:");
+         log.debug("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
+         log.debug("New Members : " + newMembers.size()  + " (" + newMembers + ")");
+         log.debug("All Members : " + allMembers.size()  + " (" + allMembers + ")");
+      }      
+      
+      this.nodeNames = extractNodeNames(allMembers);
+     
+      purgeDeadMembers(deadMembers);
+   }
+   
+   // AsynchEventHandler.AsynchEventProcessor implementation -----------------
+   
+   public void processEvent(Object event)
+   {
+      KeyChangeEvent kce = (KeyChangeEvent) event;
+      notifyKeyListeners(kce.key, kce.replicants);
+   }
+   
+   protected static class KeyChangeEvent
+   {
+      public String key;
+      public List replicants;
+      
+      public KeyChangeEvent(String key, List replicants)
+      {
+         this.key = key;
+         this.replicants = replicants;
+      }
+   }
+
+   // DistributedReplicantManager cluster callbacks ----------------------------------------------              
+   
+   /**
+    * Cluster callback called when a new replicant is added on another node
+    * @param key Replicant key
+    * @param nodeName Node that add the current replicant
+    * @param replicant Serialized representation of the replicant
+    */   
+   public void addFromCluster(String key, String nodeName, Serializable replicant)
+   {
+      if(log.isTraceEnabled())
+         log.trace("_add(" + key + ", " + nodeName);
+      
+      try
+      {
+         addReplicant(key, nodeName, replicant);
+         // Notify listeners asynchronously
+         enqueueKeyChangeEvent(key, lookupReplicants(key));
+      }
+      catch (Exception ex)
+      {
+         log.error("_add failed", ex);
+      }
+   }
+   
+   /**
+    * Cluster callback called when a replicant is removed by another node
+    * @param key Name of the replicant key
+    * @param nodeName Node that wants to remove its replicant for the give key
+    */   
+   public void removeFromCluster(String key, String nodeName)
+   {
+      try
+      {
+         if (removeReplicant (key, nodeName)) {
+            // Notify listeners asynchronously
+            enqueueKeyChangeEvent(key, lookupReplicants(key));
+         }
+      }
+      catch (Exception ex)
+      {
+         log.error("_remove failed", ex);
+      }
+   }
+   
+   /**
+    * Cluster callback called when a node wants to know our complete list of local replicants
+    * @throws Exception Thrown if a cluster communication exception occurs
+    * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
+    */   
+   public Object[] lookupLocalReplicants() throws Exception
+   {
+      nodeNameKnown.acquire (); // we don't answer until our name is known
+      
+      Object[] rtn = {getNodeName(), localReplicants};
+      if(log.isTraceEnabled())
+         log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
+      return rtn;
+   }   
+
+   // Misc Public -----------------------------------------------------
+   
+   public String listContent () throws Exception
+   {
+      // we merge all replicants services: local only or not
+      //
+      Collection services = this.getAllServices ();
+
+      StringBuffer result = new StringBuffer ();      
+      result.append ("<pre>");
+      
+      for(Iterator catsIter = services.iterator(); catsIter.hasNext();)
+      {
+         String category = (String)catsIter.next ();
+         HashMap content = (HashMap)this.replicants.get(category);
+         if (content == null)
+            content = new HashMap ();
+         
+         result.append ("-----------------------------------------------\n");
+         result.append ("Service : ").append (category).append ("\n\n");
+         
+         Serializable local = lookupLocalReplicant(category);
+         if (local == null)
+            result.append ("\t- Service is *not* available locally\n");
+         else
+            result.append ("\t- Service *is* also available locally\n");
+
+         for (Iterator keysIter = content.keySet().iterator(); keysIter.hasNext();)
+         {
+            String location = (String)keysIter.next ();            
+            result.append ("\t- ").append(location).append ("\n");
+         }
+         
+         result.append ("\n");
+         
+      }
+      
+      result.append ("</pre>");
+      
+      return result.toString ();
+   }
+   
+   public String listXmlContent() throws Exception
+   {
+      // we merge all replicants services: local only or not
+      //
+      java.util.Collection services = this.getAllServices ();
+      StringBuffer result = new StringBuffer ();
+
+      result.append ("<ReplicantManager>\n");
+
+      for(Iterator catsIter = services.iterator(); catsIter.hasNext();)
+      {
+         String category = (String)catsIter.next ();
+         HashMap content = (HashMap)this.replicants.get (category);
+         if (content == null)
+            content = new HashMap ();
+                  
+         result.append ("\t<Service>\n");
+         result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
+
+         
+         Serializable local = lookupLocalReplicant(category);
+         if (local != null)
+         {
+            result.append ("\t\t<Location>\n");
+            result.append ("\t\t\t<Name local=\"True\">").append (getNodeName()).append ("</Name>\n");
+            result.append ("\t\t</Location>\n");
+         }
+
+         for (Iterator keysIter = content.keySet().iterator(); keysIter.hasNext();)
+         {
+            String location = (String)keysIter.next ();            
+            result.append ("\t\t<Location>\n");
+            result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
+            result.append ("\t\t</Location>\n");
+         }
+         
+         result.append ("\t</Service>\n");
+         
+      }
+
+      result.append ("</ReplicantManager>\n");
+      
+      return result.toString ();
+   }
+
+   
+   // Protected -----------------------------------------------------
+   
+   protected int calculateReplicantsHash (List members)
+   {
+      int result = 0;
+      Object obj = null;
+      
+      for (int i=0; i<members.size (); i++)
+      {
+         obj = members.get(i);
+         if (obj != null)
+            result+= obj.hashCode (); // no explicit overflow with int addition
+      }
+      
+      return result;
+   }
+   
+   protected int updateReplicantsHashId (String key)
+   {
+      // we first get a list of all nodes names that replicate this key
+      //
+      List nodes = this.lookupReplicantsNodeNames (key);
+      int result = 0;
+      
+      if ( (nodes == null) || (nodes.size () == 0) )
+      {
+         // no nore replicants for this key: we uncache our view id
+         //
+         this.intraviewIdCache.remove (key);
+      }
+      else
+      {
+         result = this.calculateReplicantsHash (nodes);
+         this.intraviewIdCache.put (key, new Integer (result));
+      }
+      
+      return result;
+      
+   }
+   
+   protected void enqueueKeyChangeEvent(String key, List replicants)
+      throws InterruptedException
+   {
+      asynchHandler.queueEvent(new KeyChangeEvent(key, replicants));
+   }
+   
+   protected boolean removeReplicant(String key, String nodeName) throws Exception
+   {
+      synchronized(replicants)
+      {
+         HashMap replicant = (HashMap)replicants.get(key);
+         if (replicant == null) return false;
+         Object removed = replicant.remove(nodeName);
+         if (removed != null)
+         {
+            Collection values = replicant.values();               
+            if (values.size() == 0)
+            {
+               replicants.remove(key);
+            }
+            return true;
+         }
+      }
+      return false;
+   }
+   
+   protected void removeLocal(String key)
+   {
+      synchronized(localReplicants)
+      {
+         localReplicants.remove(key);
+         List result = lookupReplicants(key);
+         if (result == null)
+            result = new ArrayList (); // don't pass null but an empty list
+         notifyKeyListeners(key, result);
+      }
+   }
+   
+   /**
+    * Add a replicant to the replicants map.
+    * @param key replicant key name
+    * @param nodeName name of the node that adds this replicant
+    * @param replicant Serialized representation of the replica
+    */
+   protected void addReplicant(String key, String nodeName, Serializable replicant)
+   {
+      addReplicant(replicants, key, nodeName, replicant);
+   }
+   
+   /**
+    * Logic for adding replicant to any map.
+    * @param map structure in which adding the new replicant
+    * @param key name of the replicant key
+    * @param nodeName name of the node adding the replicant
+    * @param replicant serialized representation of the replicant that is added
+    */
+   protected void addReplicant(Map map, String key, String nodeName, Serializable replicant)
+   {
+      synchronized(map)
+      {
+         HashMap rep = (HashMap)map.get(key);
+         if (rep == null)
+         {
+            if(log.isTraceEnabled())
+               log.trace("adding new HashMap");
+            rep = new HashMap();
+            map.put(key, rep);
+         }
+         rep.put(nodeName, replicant);         
+      }
+   }
+   
+   protected Vector getKeysReplicatedByNode (String nodeName)
+   {
+      Vector result = new Vector ();
+      synchronized (replicants)
+      {         
+         for (Iterator iter = replicants.entrySet().iterator(); iter.hasNext (); )
+         {
+            Map.Entry entry = (Map.Entry) iter.next();
+            HashMap values = (HashMap) entry.getValue();
+            if ( (values != null) && values.containsKey (nodeName) )
+            {
+               result.add (entry.getKey());
+            }
+         }
+      }
+      return result;
+   }
+   
+   /**
+    * Indicates if the a replicant already exists for a given key/node pair
+    * @param key replicant key name
+    * @param nodeName name of the node
+    * @return a boolean indicating if a replicant for the given node exists for the given key
+    */   
+   protected boolean replicantEntryAlreadyExists (String key, String nodeName)
+   {
+      return replicantEntryAlreadyExists (replicants, key, nodeName);
+   }
+   
+   /**
+    * Indicates if the a replicant already exists for a given key/node pair in the give data structure
+    */   
+   protected boolean replicantEntryAlreadyExists (Map map, String key, String nodeName)
+   {
+         HashMap rep = (HashMap)map.get(key);
+         if (rep == null)
+            return false;
+         else
+            return rep.containsKey (nodeName);
+   }
+   
+   /**
+    * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
+    * @param key The replicant key name
+    * @param newReplicants The new list of replicants
+    * 
+    */   
+   protected void notifyKeyListeners(String key, List newReplicants)
+   {
+      boolean trace = log.isTraceEnabled();
+      
+      if( trace )
+         log.trace("notifyKeyListeners");
+
+      // we first update the intra-view id for this particular key
+      //
+      int newId = updateReplicantsHashId (key);
+      
+      ArrayList listeners = (ArrayList)keyListeners.get(key);
+      if (listeners == null)
+      {
+         if( trace )
+            log.trace("listeners is null");
+         return;
+      }
+      
+      // ArrayList's iterator is not thread safe
+      DistributedReplicantManager.ReplicantListener[] toNotify = null;
+      synchronized(listeners) 
+      {
+         toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
+         toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+      }
+      
+      if( trace )
+         log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
+      for (int i = 0; i < toNotify.length; i++)
+      {
+         if (toNotify[i] != null)
+            toNotify[i].replicantsChanged(key, newReplicants, newId);
+      }
+   }
+
+   protected void republishLocalReplicants()
+   {
+      try
+      {
+         boolean trace = log.isTraceEnabled();
+         
+         if( trace )
+            log.trace("Start Re-Publish local replicants in DRM");
+
+         HashMap localReplicants;
+         synchronized (this.localReplicants)
+         {
+            localReplicants = new HashMap(this.localReplicants);
+         }
+
+         Iterator entries = localReplicants.entrySet().iterator();
+         while( entries.hasNext() )
+         {
+            Map.Entry entry = (Map.Entry) entries.next();
+            String key = (String) entry.getKey();
+            Serializable replicant = (Serializable) entry.getValue();
+            if (replicant != null)
+            {
+               if( trace )
+                  log.trace("publishing, key=" + key + ", value=" + replicant);
+
+               remoteAdd(key, getNodeName(), replicant, true);
+
+               notifyKeyListeners(key, lookupReplicants(key));
+            }
+         }
+         if( trace )
+            log.trace("End Re-Publish local replicants");
+      }
+      catch (Exception e)
+      {
+         log.error("Re-Publish failed", e);
+      }
+   }
+
+   ////////////////////
+   // Group membership API
+   ////////////////////
+
+   protected void mergeMembers()
+   {
+      try
+      {
+         log.debug("Start merging members in DRM service...");
+         java.util.HashSet notifies = new java.util.HashSet ();
+         List rsp = remoteLookupReplicants();
+         
+         if (rsp.size() == 0)
+            log.debug("No responses from other nodes during the DRM merge process.");
+         else 
+         { 
+            log.debug("The DRM merge process has received " + rsp.size() + " answers");
+         }
+         for (int i = 0; i < rsp.size(); i++)
+         {
+            Object o = rsp.get(i);
+            if (o == null)
+            {
+               log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
+               continue;
+            }
+            else if (o instanceof Throwable)
+            {
+               log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+               continue;
+            }
+            
+            Object[] objs = (Object[]) o;
+            String node = (String)objs[0];
+            Map replicants = (Map)objs[1];
+            Iterator keys = replicants.keySet().iterator();
+            
+            //FIXME: We don't remove keys in the merge process but only add new keys!
+            while (keys.hasNext())
+            {
+               String key = (String)keys.next();
+               // done to reduce duplicate notifications
+               if (!replicantEntryAlreadyExists  (key, node))
+               {
+                  addReplicant(key, node, (Serializable)replicants.get(key));
+                  notifies.add (key);
+               }
+            }
+            
+            Vector currentStatus = getKeysReplicatedByNode (node);
+            if (currentStatus.size () > replicants.size ())
+            {
+               // The merge process needs to remove some (now)
+               // unexisting keys
+               //
+               for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
+               {
+                  String theKey = (String)currentStatus.elementAt (currentKeysId);
+                  if (!replicants.containsKey (theKey))
+                  {
+                     removeReplicant (theKey, node);
+                     notifies.add(theKey);
+                  }
+               }
+            }
+         }   
+         
+         Iterator notifIter = notifies.iterator ();
+         while (notifIter.hasNext ())
+         {
+            String key = (String)notifIter.next ();
+            notifyKeyListeners(key, lookupReplicants(key));
+         }
+         log.debug ("..Finished merging members in DRM service");
+
+      }               
+      catch (Exception ex)
+      {
+         log.error("merge failed", ex);
+      }
+   }
+
+   /**
+    * get rid of dead members from replicant list
+    * return true if anything was purged.
+    */
+   protected void purgeDeadMembers(Vector deadMembers)
+   {
+      if (deadMembers.size() <= 0)
+         return;
+
+      log.debug("purgeDeadMembers, "+deadMembers);
+      try
+      {
+         synchronized(replicants)
+         {
+            Iterator keys = replicants.keySet().iterator();
+            while (keys.hasNext())
+            {
+               String key = (String)keys.next();
+               HashMap replicant = (HashMap)replicants.get(key);
+               boolean modified = false;
+               for (int i = 0; i < deadMembers.size(); i++)
+               {
+                  String node = deadMembers.elementAt(i).toString();
+                  log.debug("trying to remove deadMember " + node + " for key " + key);
+                  Object removed = replicant.remove(node);
+                  if (removed != null) 
+                  {
+                     log.debug(node + " was removed");
+                     modified = true;
+                  }
+                  else
+                  {
+                     log.debug(node + " was NOT removed!!!");
+                  }
+               }
+               if (modified)
+               {
+                  notifyKeyListeners(key, lookupReplicants(key));
+               }
+            }
+         }
+      }
+      catch (Exception ex)
+      {
+         log.error("purgeDeadMembers failed", ex);
+      }
+   }
+
+   /**
+    */   
+   protected void cleanupKeyListeners()
+   {
+      // NOT IMPLEMENTED YET
+   }
+
+   protected synchronized static int nextThreadID()
+   {
+      return threadID ++;
+   }
+
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+
+   protected class MergeMembers extends Thread
+   {
+      public MergeMembers()
+      {
+         super("DRM Async Merger#"+nextThreadID());
+      }
+
+      /**
+       * Called when the service needs to merge with another partition. This
+       * process is performed asynchronously
+       */      
+      public void run()
+      {
+         log.debug("Sleeping for 50ms before mergeMembers");
+         try
+         {
+            // if this thread invokes a cluster method call before
+            // membershipChanged event completes, it could timeout/hang
+            // we need to discuss this with Bela.
+            Thread.sleep(50); 
+         }
+         catch (Exception ignored)
+         {
+         }
+         mergeMembers();
+      }
+   }
+
+   protected class MembersPublisher extends Thread
+   {
+      public MembersPublisher()
+      {
+         super("DRM Async Publisher#"+nextThreadID());
+      }
+
+      /**
+       * Called when service needs to re-publish its local replicants to other
+       * cluster members after this node has joined the cluster.
+       */
+      public void run()
+      {
+         log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
+         try
+         {
+            // if this thread invokes a cluster method call before
+            // membershipChanged event completes, it could timeout/hang
+            // we need to discuss this with Bela.
+            Thread.sleep(50);
+         }
+         catch (Exception ignored)
+         {
+         }
+         republishLocalReplicants();
+      }
+   }
+
+   
+}




More information about the jboss-cvs-commits mailing list