[jboss-cvs] JBossAS SVN: r57644 - projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 12 23:34:37 EDT 2006
Author: bstansberry at jboss.com
Date: 2006-10-12 23:34:36 -0400 (Thu, 12 Oct 2006)
New Revision: 57644
Added:
projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java
Log:
Add abstract impl of DRM
Added: projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java
===================================================================
--- projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java 2006-10-13 03:33:34 UTC (rev 57643)
+++ projects/cluster/trunk/ha-server/src/main/java/org/jboss/ha/server/impl/AbstractDistributedReplicantManager.java 2006-10-13 03:34:36 UTC (rev 57644)
@@ -0,0 +1,1085 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.server.impl;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import org.jboss.ha.server.spi.DistributedReplicantManager;
+import org.jboss.logging.Logger;
+
+/**
+ * Base implementation of DistributedReplicantManager.
+ *
+ * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
+ * @version $Revision: 1.1 $
+ */
+public abstract class AbstractDistributedReplicantManager
+ implements DistributedReplicantManager, AsynchEventHandler.AsynchEventProcessor
+{
+ // Constants -----------------------------------------------------
+
+ protected final static String SERVICE_NAME = "DistributedReplicantManager";
+
+ // Attributes ----------------------------------------------------
+ protected static int threadID;
+
+ protected Map localReplicants = new ConcurrentReaderHashMap();
+ protected Map replicants = new ConcurrentReaderHashMap();
+ protected Map keyListeners = new ConcurrentReaderHashMap();
+ protected Map intraviewIdCache = new HashMap();
+
+ /** The handler used to send replicant change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
+
+ protected Logger log = Logger.getLogger(getClass().getName());
+
+ protected String[] nodeNames;
+
+ protected Latch nodeNameKnown = new Latch();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * Default constructor.
+ */
+ public AbstractDistributedReplicantManager()
+ {
+ }
+
+ // Abstract Methods
+
+ /** Get the name for this node that should be used in DRM entries */
+ protected abstract String getNodeName();
+
+ /**
+ * Extracts the list of node names from the given list of the current cluster members.
+ * These names will be the ones used in DRM entries.
+ *
+ * @param view list of cluster members. Type of the list elements is implementation dependent.
+ *
+ * @return the String names of the cluster members, in the same order as <code>view</code>.
+ */
+ protected abstract String[] extractNodeNames(List view);
+
+ /**
+ * Make an RPC to the other cluster members adding the given elements. The RPC
+ * should result in {@link #addFromCluster(String, String, Serializable)}
+ * being called on the remote nodes.
+ *
+ * @param key the replicant's name
+ * @param nodeName the name the node adding the replicant
+ * @param replicant this node's replicant for the key
+ * @param async <code>true</code> if the RPC call should be asynchronous
+ */
+ protected abstract void remoteAdd(String key, String nodeName, Serializable replicant, boolean async);
+
+ /**
+ * Make an RPC to the other cluster members removing the given key. The RPC
+ * should result in {@link #removeFromCluster(String, String)}
+ * being called on the remote nodes.
+ *
+ * @param key the replicant's name
+ * @param nodeName the name the node adding the replicant
+ * @param async <code>true</code> if the RPC call should be asynchronous
+ */
+ protected abstract void remoteRemove(String key, String nodeName, boolean async);
+
+ /**
+ * Make an RPC to the other cluster members getting the local replicant
+ * data from each. The RPC should result in {@link #lookupLocalReplicants()}
+ * being called on the remote nodes.
+ *
+ * @return <code>List</code> of RPC responses, each element is the response
+ * for a given node. A response may be <code>null</code>, a
+ * <code>Throwable</code> or the normal return value from
+ * {@link #lookupLocalReplicants()}.
+ */
+ protected abstract List remoteLookupReplicants();
+
+ // Service lifecycle --------------------------------------------------------
+
+ public void create() throws Exception
+ {
+
+ }
+
+ public void start() throws Exception
+ {
+ // Create the asynch listener handler thread
+ asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
+ asynchHandler.start();
+
+ if (getNodeName() != null)
+ {
+ nodeNameKnown.release (); // partition name is now known!
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot determine node name");
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to
+ // destroy allows restart of HAPartition while local registrations
+ // survive -- stopping partition does not stop all registered services
+ // e.g. ejbs; if we maintain their registrations we can pass them to
+ // the cluster when we restart. However, we are leaving all the remote
+ // replicants we have registered around, so they will still be included
+ // as targets if anyone contacts our EJB while partition is stopped.
+ // Probably OK; if they aren't valid the client will find this out.
+
+// NR 200505 : [JBCLUSTER-38] move to destroy
+// if (localReplicants != null)
+// {
+// synchronized(localReplicants)
+// {
+// while (! localReplicants.isEmpty ())
+// {
+// this.remove ((String)localReplicants.keySet().iterator().next ());
+// }
+// }
+// }
+
+ // Stop the asynch handler thread
+ try
+ {
+ asynchHandler.stop();
+ }
+ catch( Exception e)
+ {
+ log.warn("Failed to stop asynchHandler", e);
+ }
+ }
+
+ // NR 200505 : [JBCLUSTER-38] unbind at destroy
+ public void destroy() throws Exception
+ {
+ // now partition can't be resuscitated, so remove local replicants
+ if (localReplicants != null)
+ {
+ synchronized(localReplicants)
+ {
+ String[] keys = new String[localReplicants.size()];
+ localReplicants.keySet().toArray(keys);
+ for(int n = 0; n < keys.length; n ++)
+ {
+ this.removeLocal(keys[n]); // channel is disconnected, so
+ // don't try to notify cluster
+ }
+ }
+ }
+ }
+
+ // DistributedReplicantManager implementation ----------------------------------------------
+
+ public void add(String key, Serializable replicant) throws Exception
+ {
+ if(log.isTraceEnabled())
+ log.trace("add, key="+key+", value="+replicant);
+ nodeNameKnown.acquire (); // we don't propagate until our name is known
+
+ remoteAdd(key, getNodeName(), replicant, false);
+ synchronized(localReplicants)
+ {
+ localReplicants.put(key, replicant);
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
+ }
+
+ public void remove(String key) throws Exception
+ {
+ nodeNameKnown.acquire (); // we don't propagate until our name is known
+
+ // optimisation: we don't make a costly network call
+ // if there is nothing to remove
+ if (localReplicants.containsKey(key))
+ {
+ remoteRemove(key, getNodeName(),true);
+ removeLocal(key);
+ }
+ }
+
+ public Collection getAllServices ()
+ {
+ HashSet services = new HashSet();
+ services.addAll (localReplicants.keySet ());
+ services.addAll (replicants.keySet ());
+ return services;
+ }
+
+ public Serializable lookupLocalReplicant(String key)
+ {
+ return (Serializable)localReplicants.get(key);
+ }
+
+ public List lookupReplicants(String key)
+ {
+ Serializable local = lookupLocalReplicant(key);
+ HashMap replicant = (HashMap)replicants.get(key);
+ if (replicant == null && local == null)
+ return null;
+
+ ArrayList rtn = new ArrayList();
+
+ if (replicant == null)
+ {
+ if (local != null)
+ rtn.add(local);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ Object replVal;
+ String ourName = getNodeName();
+ for (int i = 0; i < nodeNames.length; i++)
+ {
+ if (local != null && ourName.equals(nodeNames[i]))
+ {
+ rtn.add(local);
+ continue;
+ }
+
+ replVal = replicant.get(nodeNames[i]);
+ if (replVal != null)
+ rtn.add(replVal);
+ }
+ }
+
+ return rtn;
+ }
+
+ public List lookupReplicantsNodeNames(String key)
+ {
+ boolean locallyReplicated = localReplicants.containsKey (key);
+ HashMap replicant = (HashMap)replicants.get(key);
+ if (replicant == null && !locallyReplicated)
+ return null;
+
+ String ourName = getNodeName();
+ ArrayList rtn = new ArrayList();
+
+ if (replicant == null)
+ {
+ if (locallyReplicated)
+ rtn.add(ourName);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ Set keys = replicant.keySet();
+ for (int i = 0; i < nodeNames.length; i++)
+ {
+ if (locallyReplicated && ourName.equals(nodeNames[i]))
+ {
+ rtn.add(ourName);
+ continue;
+ }
+
+ if (keys.contains(nodeNames[i]))
+ rtn.add(nodeNames[i]);
+ }
+ }
+
+ return rtn;
+ }
+
+ public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+ {
+ synchronized(keyListeners)
+ {
+ ArrayList listeners = (ArrayList)keyListeners.get(key);
+ if (listeners == null)
+ {
+ listeners = new ArrayList();
+ keyListeners.put(key, listeners);
+ }
+ listeners.add(subscriber);
+ }
+ }
+
+ public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+ {
+ synchronized(keyListeners)
+ {
+ ArrayList listeners = (ArrayList)keyListeners.get (key);
+ if (listeners == null) return;
+
+ listeners.remove(subscriber);
+ if (listeners.size() == 0)
+ keyListeners.remove(key);
+
+ }
+ }
+
+ public int getReplicantsViewId(String key)
+ {
+ Integer result = (Integer)this.intraviewIdCache.get (key);
+
+ if (result == null)
+ return 0;
+ else
+ return result.intValue ();
+ }
+
+ public boolean isMasterReplica (String key)
+ {
+ boolean trace = log.isTraceEnabled();
+ if(trace)
+ log.trace("isMasterReplica, key="+key);
+ // if I am not a replicat, I cannot be the master...
+ //
+ if (!localReplicants.containsKey (key))
+ {
+ if( trace )
+ log.trace("no localReplicants, key="+key+", isMasterReplica=false");
+ return false;
+ }
+
+ HashMap repForKey = (HashMap)replicants.get(key);
+ if (repForKey==null)
+ {
+ if( trace )
+ log.trace("no replicants, key="+key+", isMasterReplica=true");
+ return true;
+ }
+ Vector replicaNodes = new Vector (repForKey.keySet());
+ String ourName = getNodeName();
+ boolean isMasterReplica = false;
+ for (int i=0; i< nodeNames.length; i++)
+ {
+ if( trace )
+ log.trace("Testing member: " + nodeNames[i]);
+ if (replicaNodes.contains (nodeNames[i]))
+ {
+ if( trace )
+ log.trace("Member found in replicantNodes, isMasterReplica=false");
+ break;
+ }
+ else if (nodeNames[i].equals(ourName))
+ {
+ if( trace )
+ log.trace("Member == nodeName, isMasterReplica=true");
+ isMasterReplica = true;
+ break;
+ }
+ }
+ return isMasterReplica;
+ }
+
+ // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
+
+ public Serializable getCurrentState ()
+ {
+ java.util.Collection services = this.getAllServices ();
+ HashMap result = new HashMap ();
+
+ java.util.Iterator catsIter = services.iterator ();
+ while (catsIter.hasNext ())
+ {
+ String category = (String)catsIter.next ();
+ HashMap content = (HashMap)this.replicants.get (category);
+ if (content == null)
+ content = new HashMap ();
+ else
+ content = (HashMap)content.clone ();
+
+ Serializable local = lookupLocalReplicant(category);
+ if (local != null)
+ content.put (getNodeName(), local);
+
+ result.put (category, content);
+ }
+
+ // we add the intraviewid cache to the global result
+ //
+ Object[] globalResult = new Object[] {result, intraviewIdCache};
+ return globalResult;
+ }
+
+ public void setCurrentState(Serializable newState)
+ {
+ Object[] globalState = (Object[])newState;
+
+ HashMap map = (HashMap)globalState[0];
+ this.replicants.putAll(map);
+ this.intraviewIdCache = (HashMap)globalState[1];
+
+ if(log.isTraceEnabled())
+ {
+ log.trace(getNodeName() + ": received new state, will republish local replicants");
+ }
+ MembersPublisher publisher = new MembersPublisher();
+ publisher.start();
+ }
+
+ // HAPartition.HAMembershipListener implementation ----------------------------------------------
+
+ public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups)
+ {
+ membershipChanged(deadMembers, newMembers, allMembers);
+
+ log.debug("Merging partitions...");
+ log.debug("Originating groups: " + originatingGroups);
+ if (newMembers.size() > 0)
+ {
+ new MergeMembers().start();
+ }
+ }
+
+ public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+ {
+ // Here we only care about deadMembers. Purge all replicant lists of deadMembers
+ // and then notify all listening nodes.
+ if (log.isDebugEnabled())
+ {
+ log.debug("I am (" + getNodeName() + ") received membershipChanged event:");
+ log.debug("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
+ log.debug("New Members : " + newMembers.size() + " (" + newMembers + ")");
+ log.debug("All Members : " + allMembers.size() + " (" + allMembers + ")");
+ }
+
+ this.nodeNames = extractNodeNames(allMembers);
+
+ purgeDeadMembers(deadMembers);
+ }
+
+ // AsynchEventHandler.AsynchEventProcessor implementation -----------------
+
+ public void processEvent(Object event)
+ {
+ KeyChangeEvent kce = (KeyChangeEvent) event;
+ notifyKeyListeners(kce.key, kce.replicants);
+ }
+
+ protected static class KeyChangeEvent
+ {
+ public String key;
+ public List replicants;
+
+ public KeyChangeEvent(String key, List replicants)
+ {
+ this.key = key;
+ this.replicants = replicants;
+ }
+ }
+
+ // DistributedReplicantManager cluster callbacks ----------------------------------------------
+
+ /**
+ * Cluster callback called when a new replicant is added on another node
+ * @param key Replicant key
+ * @param nodeName Node that add the current replicant
+ * @param replicant Serialized representation of the replicant
+ */
+ public void addFromCluster(String key, String nodeName, Serializable replicant)
+ {
+ if(log.isTraceEnabled())
+ log.trace("_add(" + key + ", " + nodeName);
+
+ try
+ {
+ addReplicant(key, nodeName, replicant);
+ // Notify listeners asynchronously
+ enqueueKeyChangeEvent(key, lookupReplicants(key));
+ }
+ catch (Exception ex)
+ {
+ log.error("_add failed", ex);
+ }
+ }
+
+ /**
+ * Cluster callback called when a replicant is removed by another node
+ * @param key Name of the replicant key
+ * @param nodeName Node that wants to remove its replicant for the give key
+ */
+ public void removeFromCluster(String key, String nodeName)
+ {
+ try
+ {
+ if (removeReplicant (key, nodeName)) {
+ // Notify listeners asynchronously
+ enqueueKeyChangeEvent(key, lookupReplicants(key));
+ }
+ }
+ catch (Exception ex)
+ {
+ log.error("_remove failed", ex);
+ }
+ }
+
+ /**
+ * Cluster callback called when a node wants to know our complete list of local replicants
+ * @throws Exception Thrown if a cluster communication exception occurs
+ * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
+ */
+ public Object[] lookupLocalReplicants() throws Exception
+ {
+ nodeNameKnown.acquire (); // we don't answer until our name is known
+
+ Object[] rtn = {getNodeName(), localReplicants};
+ if(log.isTraceEnabled())
+ log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
+ return rtn;
+ }
+
+ // Misc Public -----------------------------------------------------
+
+ public String listContent () throws Exception
+ {
+ // we merge all replicants services: local only or not
+ //
+ Collection services = this.getAllServices ();
+
+ StringBuffer result = new StringBuffer ();
+ result.append ("<pre>");
+
+ for(Iterator catsIter = services.iterator(); catsIter.hasNext();)
+ {
+ String category = (String)catsIter.next ();
+ HashMap content = (HashMap)this.replicants.get(category);
+ if (content == null)
+ content = new HashMap ();
+
+ result.append ("-----------------------------------------------\n");
+ result.append ("Service : ").append (category).append ("\n\n");
+
+ Serializable local = lookupLocalReplicant(category);
+ if (local == null)
+ result.append ("\t- Service is *not* available locally\n");
+ else
+ result.append ("\t- Service *is* also available locally\n");
+
+ for (Iterator keysIter = content.keySet().iterator(); keysIter.hasNext();)
+ {
+ String location = (String)keysIter.next ();
+ result.append ("\t- ").append(location).append ("\n");
+ }
+
+ result.append ("\n");
+
+ }
+
+ result.append ("</pre>");
+
+ return result.toString ();
+ }
+
+ public String listXmlContent() throws Exception
+ {
+ // we merge all replicants services: local only or not
+ //
+ java.util.Collection services = this.getAllServices ();
+ StringBuffer result = new StringBuffer ();
+
+ result.append ("<ReplicantManager>\n");
+
+ for(Iterator catsIter = services.iterator(); catsIter.hasNext();)
+ {
+ String category = (String)catsIter.next ();
+ HashMap content = (HashMap)this.replicants.get (category);
+ if (content == null)
+ content = new HashMap ();
+
+ result.append ("\t<Service>\n");
+ result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
+
+
+ Serializable local = lookupLocalReplicant(category);
+ if (local != null)
+ {
+ result.append ("\t\t<Location>\n");
+ result.append ("\t\t\t<Name local=\"True\">").append (getNodeName()).append ("</Name>\n");
+ result.append ("\t\t</Location>\n");
+ }
+
+ for (Iterator keysIter = content.keySet().iterator(); keysIter.hasNext();)
+ {
+ String location = (String)keysIter.next ();
+ result.append ("\t\t<Location>\n");
+ result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
+ result.append ("\t\t</Location>\n");
+ }
+
+ result.append ("\t</Service>\n");
+
+ }
+
+ result.append ("</ReplicantManager>\n");
+
+ return result.toString ();
+ }
+
+
+ // Protected -----------------------------------------------------
+
+ protected int calculateReplicantsHash (List members)
+ {
+ int result = 0;
+ Object obj = null;
+
+ for (int i=0; i<members.size (); i++)
+ {
+ obj = members.get(i);
+ if (obj != null)
+ result+= obj.hashCode (); // no explicit overflow with int addition
+ }
+
+ return result;
+ }
+
+ protected int updateReplicantsHashId (String key)
+ {
+ // we first get a list of all nodes names that replicate this key
+ //
+ List nodes = this.lookupReplicantsNodeNames (key);
+ int result = 0;
+
+ if ( (nodes == null) || (nodes.size () == 0) )
+ {
+ // no nore replicants for this key: we uncache our view id
+ //
+ this.intraviewIdCache.remove (key);
+ }
+ else
+ {
+ result = this.calculateReplicantsHash (nodes);
+ this.intraviewIdCache.put (key, new Integer (result));
+ }
+
+ return result;
+
+ }
+
+ protected void enqueueKeyChangeEvent(String key, List replicants)
+ throws InterruptedException
+ {
+ asynchHandler.queueEvent(new KeyChangeEvent(key, replicants));
+ }
+
+ protected boolean removeReplicant(String key, String nodeName) throws Exception
+ {
+ synchronized(replicants)
+ {
+ HashMap replicant = (HashMap)replicants.get(key);
+ if (replicant == null) return false;
+ Object removed = replicant.remove(nodeName);
+ if (removed != null)
+ {
+ Collection values = replicant.values();
+ if (values.size() == 0)
+ {
+ replicants.remove(key);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected void removeLocal(String key)
+ {
+ synchronized(localReplicants)
+ {
+ localReplicants.remove(key);
+ List result = lookupReplicants(key);
+ if (result == null)
+ result = new ArrayList (); // don't pass null but an empty list
+ notifyKeyListeners(key, result);
+ }
+ }
+
+ /**
+ * Add a replicant to the replicants map.
+ * @param key replicant key name
+ * @param nodeName name of the node that adds this replicant
+ * @param replicant Serialized representation of the replica
+ */
+ protected void addReplicant(String key, String nodeName, Serializable replicant)
+ {
+ addReplicant(replicants, key, nodeName, replicant);
+ }
+
+ /**
+ * Logic for adding replicant to any map.
+ * @param map structure in which adding the new replicant
+ * @param key name of the replicant key
+ * @param nodeName name of the node adding the replicant
+ * @param replicant serialized representation of the replicant that is added
+ */
+ protected void addReplicant(Map map, String key, String nodeName, Serializable replicant)
+ {
+ synchronized(map)
+ {
+ HashMap rep = (HashMap)map.get(key);
+ if (rep == null)
+ {
+ if(log.isTraceEnabled())
+ log.trace("adding new HashMap");
+ rep = new HashMap();
+ map.put(key, rep);
+ }
+ rep.put(nodeName, replicant);
+ }
+ }
+
+ protected Vector getKeysReplicatedByNode (String nodeName)
+ {
+ Vector result = new Vector ();
+ synchronized (replicants)
+ {
+ for (Iterator iter = replicants.entrySet().iterator(); iter.hasNext (); )
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ HashMap values = (HashMap) entry.getValue();
+ if ( (values != null) && values.containsKey (nodeName) )
+ {
+ result.add (entry.getKey());
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Indicates if the a replicant already exists for a given key/node pair
+ * @param key replicant key name
+ * @param nodeName name of the node
+ * @return a boolean indicating if a replicant for the given node exists for the given key
+ */
+ protected boolean replicantEntryAlreadyExists (String key, String nodeName)
+ {
+ return replicantEntryAlreadyExists (replicants, key, nodeName);
+ }
+
+ /**
+ * Indicates if the a replicant already exists for a given key/node pair in the give data structure
+ */
+ protected boolean replicantEntryAlreadyExists (Map map, String key, String nodeName)
+ {
+ HashMap rep = (HashMap)map.get(key);
+ if (rep == null)
+ return false;
+ else
+ return rep.containsKey (nodeName);
+ }
+
+ /**
+ * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
+ * @param key The replicant key name
+ * @param newReplicants The new list of replicants
+ *
+ */
+ protected void notifyKeyListeners(String key, List newReplicants)
+ {
+ boolean trace = log.isTraceEnabled();
+
+ if( trace )
+ log.trace("notifyKeyListeners");
+
+ // we first update the intra-view id for this particular key
+ //
+ int newId = updateReplicantsHashId (key);
+
+ ArrayList listeners = (ArrayList)keyListeners.get(key);
+ if (listeners == null)
+ {
+ if( trace )
+ log.trace("listeners is null");
+ return;
+ }
+
+ // ArrayList's iterator is not thread safe
+ DistributedReplicantManager.ReplicantListener[] toNotify = null;
+ synchronized(listeners)
+ {
+ toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
+ toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+ }
+
+ if( trace )
+ log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
+ for (int i = 0; i < toNotify.length; i++)
+ {
+ if (toNotify[i] != null)
+ toNotify[i].replicantsChanged(key, newReplicants, newId);
+ }
+ }
+
+ protected void republishLocalReplicants()
+ {
+ try
+ {
+ boolean trace = log.isTraceEnabled();
+
+ if( trace )
+ log.trace("Start Re-Publish local replicants in DRM");
+
+ HashMap localReplicants;
+ synchronized (this.localReplicants)
+ {
+ localReplicants = new HashMap(this.localReplicants);
+ }
+
+ Iterator entries = localReplicants.entrySet().iterator();
+ while( entries.hasNext() )
+ {
+ Map.Entry entry = (Map.Entry) entries.next();
+ String key = (String) entry.getKey();
+ Serializable replicant = (Serializable) entry.getValue();
+ if (replicant != null)
+ {
+ if( trace )
+ log.trace("publishing, key=" + key + ", value=" + replicant);
+
+ remoteAdd(key, getNodeName(), replicant, true);
+
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
+ }
+ if( trace )
+ log.trace("End Re-Publish local replicants");
+ }
+ catch (Exception e)
+ {
+ log.error("Re-Publish failed", e);
+ }
+ }
+
+ ////////////////////
+ // Group membership API
+ ////////////////////
+
+ protected void mergeMembers()
+ {
+ try
+ {
+ log.debug("Start merging members in DRM service...");
+ java.util.HashSet notifies = new java.util.HashSet ();
+ List rsp = remoteLookupReplicants();
+
+ if (rsp.size() == 0)
+ log.debug("No responses from other nodes during the DRM merge process.");
+ else
+ {
+ log.debug("The DRM merge process has received " + rsp.size() + " answers");
+ }
+ for (int i = 0; i < rsp.size(); i++)
+ {
+ Object o = rsp.get(i);
+ if (o == null)
+ {
+ log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
+ continue;
+ }
+ else if (o instanceof Throwable)
+ {
+ log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+ continue;
+ }
+
+ Object[] objs = (Object[]) o;
+ String node = (String)objs[0];
+ Map replicants = (Map)objs[1];
+ Iterator keys = replicants.keySet().iterator();
+
+ //FIXME: We don't remove keys in the merge process but only add new keys!
+ while (keys.hasNext())
+ {
+ String key = (String)keys.next();
+ // done to reduce duplicate notifications
+ if (!replicantEntryAlreadyExists (key, node))
+ {
+ addReplicant(key, node, (Serializable)replicants.get(key));
+ notifies.add (key);
+ }
+ }
+
+ Vector currentStatus = getKeysReplicatedByNode (node);
+ if (currentStatus.size () > replicants.size ())
+ {
+ // The merge process needs to remove some (now)
+ // unexisting keys
+ //
+ for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
+ {
+ String theKey = (String)currentStatus.elementAt (currentKeysId);
+ if (!replicants.containsKey (theKey))
+ {
+ removeReplicant (theKey, node);
+ notifies.add(theKey);
+ }
+ }
+ }
+ }
+
+ Iterator notifIter = notifies.iterator ();
+ while (notifIter.hasNext ())
+ {
+ String key = (String)notifIter.next ();
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
+ log.debug ("..Finished merging members in DRM service");
+
+ }
+ catch (Exception ex)
+ {
+ log.error("merge failed", ex);
+ }
+ }
+
+ /**
+ * get rid of dead members from replicant list
+ * return true if anything was purged.
+ */
+ protected void purgeDeadMembers(Vector deadMembers)
+ {
+ if (deadMembers.size() <= 0)
+ return;
+
+ log.debug("purgeDeadMembers, "+deadMembers);
+ try
+ {
+ synchronized(replicants)
+ {
+ Iterator keys = replicants.keySet().iterator();
+ while (keys.hasNext())
+ {
+ String key = (String)keys.next();
+ HashMap replicant = (HashMap)replicants.get(key);
+ boolean modified = false;
+ for (int i = 0; i < deadMembers.size(); i++)
+ {
+ String node = deadMembers.elementAt(i).toString();
+ log.debug("trying to remove deadMember " + node + " for key " + key);
+ Object removed = replicant.remove(node);
+ if (removed != null)
+ {
+ log.debug(node + " was removed");
+ modified = true;
+ }
+ else
+ {
+ log.debug(node + " was NOT removed!!!");
+ }
+ }
+ if (modified)
+ {
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ log.error("purgeDeadMembers failed", ex);
+ }
+ }
+
+ /**
+ */
+ protected void cleanupKeyListeners()
+ {
+ // NOT IMPLEMENTED YET
+ }
+
+ protected synchronized static int nextThreadID()
+ {
+ return threadID ++;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ protected class MergeMembers extends Thread
+ {
+ public MergeMembers()
+ {
+ super("DRM Async Merger#"+nextThreadID());
+ }
+
+ /**
+ * Called when the service needs to merge with another partition. This
+ * process is performed asynchronously
+ */
+ public void run()
+ {
+ log.debug("Sleeping for 50ms before mergeMembers");
+ try
+ {
+ // if this thread invokes a cluster method call before
+ // membershipChanged event completes, it could timeout/hang
+ // we need to discuss this with Bela.
+ Thread.sleep(50);
+ }
+ catch (Exception ignored)
+ {
+ }
+ mergeMembers();
+ }
+ }
+
+ protected class MembersPublisher extends Thread
+ {
+ public MembersPublisher()
+ {
+ super("DRM Async Publisher#"+nextThreadID());
+ }
+
+ /**
+ * Called when service needs to re-publish its local replicants to other
+ * cluster members after this node has joined the cluster.
+ */
+ public void run()
+ {
+ log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
+ try
+ {
+ // if this thread invokes a cluster method call before
+ // membershipChanged event completes, it could timeout/hang
+ // we need to discuss this with Bela.
+ Thread.sleep(50);
+ }
+ catch (Exception ignored)
+ {
+ }
+ republishLocalReplicants();
+ }
+ }
+
+
+}
More information about the jboss-cvs-commits
mailing list