[jboss-cvs] JBossAS SVN: r74643 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 16 17:07:51 EDT 2008
Author: pferraro
Date: 2008-06-16 17:07:51 -0400 (Mon, 16 Jun 2008)
New Revision: 74643
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
Log:
[JBAS-5434] Ensure DistributedReplicantManager can handle concurrent JGroups requests.
Included moderate code cleanup.
Replaced use of classes from Doug Lea's concurrent package with equivalents from java.util.concurrent
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-06-16 20:53:23 UTC (rev 74642)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-06-16 21:07:51 UTC (rev 74643)
@@ -58,6 +58,7 @@
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.system.server.ServerConfigUtil;
import org.jboss.util.threadpool.ThreadPool;
+import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
import org.jgroups.ExtendedMembershipListener;
@@ -76,8 +77,8 @@
import org.jgroups.util.RspList;
/**
- * {@link HAPartition} implementation based on a
- * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
+ * {@link HAPartition} implementation based on a
+ * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
* and a multiplexed <code>JChannel</code>.
*
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
@@ -88,7 +89,7 @@
*/
public class ClusterPartition
extends ServiceMBeanSupport
- implements ExtendedMembershipListener, HAPartition,
+ implements ExtendedMembershipListener, HAPartition,
AsynchEventHandler.AsynchEventProcessor,
ClusterPartitionMBean
{
@@ -111,7 +112,7 @@
private static class StateStreamEnd implements Serializable
{
/** The serialVersionUID */
- private static final long serialVersionUID = -3705345735451504946L;
+ private static final long serialVersionUID = -3705345735451504946L;
}
/**
@@ -153,7 +154,7 @@
{
try
{
- ClusterPartition.this.channel.connect(getPartitionName());
+ ClusterPartition.this.channel.connect(ClusterPartition.this.getPartitionName());
}
catch (Exception e)
{
@@ -194,24 +195,24 @@
/** Do we send any membership change notifications synchronously? */
protected boolean allowSyncListeners = false;
/** The HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList synchListeners = new ArrayList();
+ protected ArrayList<HAMembershipListener> synchListeners = new ArrayList<HAMembershipListener>();
/** The asynch HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList asynchListeners = new ArrayList();
+ protected ArrayList<HAMembershipListener> asynchListeners = new ArrayList<HAMembershipListener>();
/** The handler used to send membership change notifications asynchronously */
protected AsynchEventHandler asynchHandler;
/** The current cluster partition members */
- protected Vector members = null;
- protected Vector jgmembers = null;
+ protected Vector<ClusterNode> members = null;
+ protected Vector<Address> jgmembers = null;
protected Map<String, WeakReference<ClassLoader>> clmap =
new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
- public Vector history = new Vector();
+ public Vector<String> history = new Vector<String>();
/** The partition members other than this node */
- protected Vector otherMembers = null;
- protected Vector jgotherMembers = null;
+ protected Vector<ClusterNode> otherMembers = null;
+ protected Vector<Address> jgotherMembers = null;
/** the local JG IP Address */
- protected org.jgroups.stack.IpAddress localJGAddress = null;
+ protected Address localJGAddress = null;
/** The cluster transport protocol address string */
protected String nodeName;
/** me as a ClusterNode */
@@ -224,7 +225,7 @@
protected DistributedStateImpl distributedState;
/** The cluster instance log category */
protected Logger log;
- protected Logger clusterLifeCycleLog;
+ protected Logger clusterLifeCycleLog;
/** The current cluster view id */
protected long currentViewId = -1;
/** Whether to bind the partition into JNDI */
@@ -254,15 +255,19 @@
private Channel createChannel()
{
- ChannelFactory factory = getChannelFactory();
+ ChannelFactory factory = this.getChannelFactory();
if (factory == null)
+ {
throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
- String stack = getChannelStackName();
+ }
+ String stack = this.getChannelStackName();
if (stack == null)
+ {
throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
+ }
try
{
- return factory.createMultiplexerChannel(stack, getPartitionName());
+ return factory.createMultiplexerChannel(stack, this.getPartitionName());
}
catch (RuntimeException e)
{
@@ -278,7 +283,7 @@
public ClusterPartition()
{
- logHistory ("Partition object created");
+ this.logHistory("Partition object created");
}
// ------------------------------------------------------------ ServiceMBean
@@ -288,16 +293,18 @@
protected void createService() throws Exception
{
if (this.replicantManager == null)
+ {
throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
+ }
- setupLoggers(getPartitionName());
+ this.setupLoggers(this.getPartitionName());
this.replicantManager.createService();
if (this.distributedState != null)
{
this.distributedState.createService();
- }
+ }
// Create the asynchronous handler for view changes
this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
@@ -307,7 +314,7 @@
protected void startService() throws Exception
{
- logHistory ("Starting partition");
+ this.logHistory ("Starting partition");
this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
@@ -315,19 +322,19 @@
if (this.channel == null || !this.channel.isOpen())
{
- this.log.debug("Creating Channel for partition " + getPartitionName() +
- " using stack " + getChannelStackName());
+ this.log.debug("Creating Channel for partition " + this.getPartitionName() +
+ " using stack " + this.getChannelStackName());
- this.channel = createChannel();
+ this.channel = this.createChannel();
this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
}
- this.log.info("Initializing partition " + getPartitionName());
- logHistory ("Initializing partition " + getPartitionName());
+ this.log.info("Initializing partition " + this.getPartitionName());
+ this.logHistory ("Initializing partition " + this.getPartitionName());
- this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), getDeadlockDetection());
+ this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), this.getDeadlockDetection());
// Subscribe to events generated by the channel
this.log.debug("setMembershipListener");
@@ -343,7 +350,7 @@
if (this.threadPool == null)
{
- this.channel.connect(getPartitionName());
+ this.channel.connect(this.getPartitionName());
connectLatch.countDown();
}
else
@@ -357,31 +364,33 @@
this.cache.start();
try
- {
+ {
// This will block waiting for any async channel connect above
connectLatch.await();
if (this.connectException != null)
+ {
throw this.connectException;
+ }
this.log.debug("Get current members");
- waitForView();
+ this.waitForView();
- // get current JG group properties
+ // get current JG group properties
this.log.debug("get nodeName");
- this.localJGAddress = (IpAddress)this.channel.getLocalAddress();
- this.me = new ClusterNodeImpl(this.localJGAddress);
+ this.localJGAddress = this.channel.getLocalAddress();
+ this.me = new ClusterNodeImpl((IpAddress) this.localJGAddress);
this.nodeName = this.me.getName();
- verifyNodeIsUnique();
+ this.verifyNodeIsUnique();
- fetchState();
+ this.fetchState();
this.replicantManager.startService();
if (this.distributedState != null)
{
- this.distributedState.setClusteredCache(getClusteredCache());
+ this.distributedState.setClusteredCache(this.getClusteredCache());
this.distributedState.startService();
}
@@ -395,9 +404,9 @@
if (this.bindIntoJndi)
{
Context ctx = new InitialContext();
- bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()),
+ this.bind(HAPartitionLocator.getStandardJndiBinding(this.getPartitionName()),
this, ClusterPartition.class, ctx);
- this.log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+ this.log.debug("Bound in JNDI under /HAPartition/" + this.getPartitionName());
}
}
catch (Throwable t)
@@ -412,8 +421,8 @@
protected void stopService() throws Exception
{
- logHistory ("Stopping partition");
- this.log.info("Stopping partition " + getPartitionName());
+ this.logHistory ("Stopping partition");
+ this.log.info("Stopping partition " + this.getPartitionName());
try
{
@@ -422,7 +431,7 @@
catch( Exception e)
{
this.log.warn("Failed to stop asynchHandler", e);
- }
+ }
if (this.distributedState != null)
{
@@ -445,7 +454,9 @@
try
{
if (this.channel != null && this.channel.isConnected())
+ {
this.channel.disconnect();
+ }
}
catch (Exception e)
{
@@ -454,7 +465,7 @@
if (this.bindIntoJndi)
{
- String boundName = HAPartitionLocator.getStandardJndiBinding(getPartitionName());
+ String boundName = HAPartitionLocator.getStandardJndiBinding(this.getPartitionName());
InitialContext ctx = null;
try
{
@@ -468,51 +479,55 @@
finally
{
if (ctx != null)
+ {
ctx.close();
+ }
}
- NonSerializableFactory.unbind (boundName);
+ NonSerializableFactory.unbind(boundName);
}
HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
- this.log.info("Partition " + getPartitionName() + " stopped.");
+ this.log.info("Partition " + this.getPartitionName() + " stopped.");
}
protected void destroyService() throws Exception
{
- this.log.debug("Destroying HAPartition: " + getPartitionName());
+ this.log.debug("Destroying HAPartition: " + this.getPartitionName());
if (this.distributedState != null)
{
this.distributedState.destroyService();
- }
+ }
this.replicantManager.destroyService();
try
{
if (this.channel != null && this.channel.isOpen())
+ {
this.channel.close();
+ }
}
catch (Exception e)
{
this.log.error("Closing channel failed", e);
}
- this.log.info("Partition " + getPartitionName() + " destroyed.");
+ this.log.info("Partition " + this.getPartitionName() + " destroyed.");
}
- // ---------------------------------------------------------- State Transfer
+ // ---------------------------------------------------------- State Transfer
protected void fetchState() throws Exception
{
- this.log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() +
+ this.log.info("Fetching serviceState (will wait for " + this.getStateTransferTimeout() +
" milliseconds):");
long start, stop;
this.isStateSet = false;
start = System.currentTimeMillis();
- boolean rc = this.channel.getState(null, getStateTransferTimeout());
+ boolean rc = this.channel.getState(null, this.getStateTransferTimeout());
if (rc)
{
synchronized (this.channelLock)
@@ -520,7 +535,9 @@
while (!this.isStateSet)
{
if (this.setStateException != null)
+ {
throw this.setStateException;
+ }
try
{
@@ -555,7 +572,7 @@
}
}
- if (isCurrentNodeCoordinator())
+ if (this.isCurrentNodeCoordinator())
{
this.log.info("State could not be retrieved (we are the first member in group)");
}
@@ -628,9 +645,11 @@
while (true)
{
- Object obj = mvis.readObject();
+ Object obj = mvis.readObject();
if (obj instanceof StateStreamEnd)
+ {
break;
+ }
String key = (String) obj;
this.log.debug("setState for " + key);
@@ -651,20 +670,20 @@
if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
{
if (e instanceof RuntimeException)
+ {
throw (RuntimeException) e;
- else
- throw new RuntimeException(e);
+ }
+
+ throw new RuntimeException(e);
}
- else
- {
- this.log.error("Caught exception setting serviceState to " + subscriber, e);
- }
+
+ this.log.error("Caught exception setting serviceState to " + subscriber, e);
}
}
else
{
this.log.debug("There is no stateHandler for: " + key);
- }
+ }
}
try
@@ -686,15 +705,19 @@
{
this.log.error("failed setting serviceState", t);
if (t instanceof Exception)
+ {
this.setStateException = (Exception) t;
+ }
else
+ {
this.setStateException = new Exception(t);
+ }
}
private void notifyChannelLock()
{
synchronized (this.channelLock)
- {
+ {
this.channelLock.notifyAll();
}
}
@@ -702,23 +725,27 @@
// org.jgroups.MembershipListener implementation ----------------------------------------------
public void suspect(org.jgroups.Address suspected_mbr)
- {
- logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
- if (isCurrentNodeCoordinator ())
+ {
+ this.logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
+ if (this.isCurrentNodeCoordinator ())
+ {
this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+ }
else
+ {
this.log.info("Suspected member: " + suspected_mbr);
+ }
}
- public void block()
- {
- this.flushBlockGate.close();
- this.log.debug("Block processed at " + this.me);
+ public void block()
+ {
+ this.flushBlockGate.close();
+ this.log.debug("Block processed at " + this.me);
}
public void unblock()
{
- this.flushBlockGate.open();
+ this.flushBlockGate.open();
this.log.debug("Unblock processed at " + this.me);
}
@@ -741,24 +768,24 @@
// Keep a list of other members only for "exclude-self" RPC calls
this.jgotherMembers = (Vector)newView.getMembers().clone();
this.jgotherMembers.remove (this.channel.getLocalAddress());
- this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
- Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
- logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
+ this.otherMembers = this.translateAddresses (this.jgotherMembers); // TRANSLATE!
+ Vector<ClusterNode> translatedNewView = this.translateAddresses ((Vector)newView.getMembers().clone());
+ this.logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
" (old view: " + this.members + " )");
// Save the previous view and make a copy of the new view
- Vector oldMembers = this.members;
+ Vector<ClusterNode> oldMembers = this.members;
- Vector newjgMembers = (Vector)newView.getMembers().clone();
- Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
+ Vector<Address> newjgMembers = (Vector)newView.getMembers().clone();
+ Vector<ClusterNode> newMembers = this.translateAddresses(newjgMembers); // TRANSLATE
this.members = newMembers;
this.jgmembers = newjgMembers;
if (oldMembers == null)
{
// Initial viewAccepted
- this.log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
+ this.log.debug("ViewAccepted: initial members set for partition " + this.getPartitionName() + ": " +
this.currentViewId + " (" + this.members + ")");
this.log.info("Number of cluster members: " + this.members.size());
@@ -770,29 +797,29 @@
this.log.info ("Other members: " + this.otherMembers.size ());
// Wake up the deployer thread blocking in waitForView
- notifyChannelLock();
+ this.notifyChannelLock();
return;
- }
+ }
- int difference = 0;
- if (oldMembers == null)
- difference = newMembers.size () - 1;
- else
- difference = newMembers.size () - oldMembers.size ();
+ int difference = newMembers.size() - oldMembers.size();
- if (isCurrentNodeCoordinator ())
- this.clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+ if (this.isCurrentNodeCoordinator ())
+ {
+ this.clusterLifeCycleLog.info ("New cluster view for partition " + this.getPartitionName() + " (id: " +
this.currentViewId + ", delta: " + difference + ") : " + this.members);
+ }
else
- this.log.info("New cluster view for partition " + getPartitionName() + ": " +
+ {
+ this.log.info("New cluster view for partition " + this.getPartitionName() + ": " +
this.currentViewId + " (" + this.members + " delta: " + difference + ")");
+ }
// Build a ViewChangeEvent for the asynch listeners
ViewChangeEvent event = new ViewChangeEvent();
event.viewId = this.currentViewId;
event.allMembers = translatedNewView;
- event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
- event.newMembers = getNewMembers(oldMembers, event.allMembers);
+ event.deadMembers = this.getDeadMembers(oldMembers, event.allMembers);
+ event.newMembers = this.getNewMembers(oldMembers, event.allMembers);
event.originatingGroups = null;
// if the new view occurs because of a merge, we first inform listeners of the merge
if(newView instanceof MergeView)
@@ -801,16 +828,14 @@
event.originatingGroups = mergeView.getSubgroups();
}
- this.log.debug("membership changed from " +
- (oldMembers == null ? 0 : oldMembers.size()) + " to " +
- event.allMembers.size());
+ this.log.debug("membership changed from " + oldMembers.size() + " to " + event.allMembers.size());
// Put the view change to the asynch queue
this.asynchHandler.queueEvent(event);
// Broadcast the new view to the synchronous view change listeners
if (this.allowSyncListeners)
{
- notifyListeners(this.synchListeners, event.viewId, event.allMembers,
+ this.notifyListeners(this.synchListeners, event.viewId, event.allMembers,
event.deadMembers, event.newMembers, event.originatingGroups);
}
}
@@ -827,21 +852,27 @@
if (this.members == null)
{
if (this.connectException != null)
+ {
throw this.connectException;
+ }
try
{
- this.channelLock.wait(getMethodCallTimeout());
+ this.channelLock.wait(this.getMethodCallTimeout());
}
catch (InterruptedException iex)
{
}
if (this.connectException != null)
+ {
throw this.connectException;
+ }
if (this.members == null)
+ {
throw new IllegalStateException("No view received from Channel");
+ }
}
}
}
@@ -856,7 +887,7 @@
public String getPartitionName()
{
return this.partitionName;
- }
+ }
public void setPartitionName(String newName)
{
@@ -878,12 +909,12 @@
return this.currentViewId;
}
- public Vector getCurrentView()
+ public Vector<String> getCurrentView()
{
- Vector result = new Vector (this.members.size());
- for (int i = 0; i < this.members.size(); i++)
+ Vector<String> result = new Vector<String>(this.members.size());
+ for (ClusterNode member: this.members)
{
- result.add( ((ClusterNode) this.members.elementAt(i)).getName() );
+ result.add(member.getName());
}
return result;
}
@@ -892,9 +923,7 @@
{
synchronized (this.members)
{
- ClusterNode[] nodes = new ClusterNode[this.members.size()];
- nodes = (ClusterNode[]) this.members.toArray(nodes);
- return nodes;
+ return this.members.toArray(new ClusterNode[this.members.size()]);
}
}
@@ -906,7 +935,9 @@
public boolean isCurrentNodeCoordinator ()
{
if(this.members == null || this.members.size() == 0 || this.me == null)
+ {
return false;
+ }
return this.members.elementAt (0).equals (this.me);
}
@@ -923,7 +954,7 @@
public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
{
- registerRPCHandler(objName, subscriber);
+ this.registerRPCHandler(objName, subscriber);
this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
}
@@ -931,7 +962,7 @@
{
this.rpcHandlers.remove(objName);
this.clmap.remove(objName);
- }
+ }
/**
* This function is an abstraction of RpcDispatcher.
@@ -939,7 +970,7 @@
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
- return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout());
+ return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.getMethodCallTimeout());
}
@@ -953,7 +984,7 @@
if(this.channel.flushSupported())
{
- this.flushBlockGate.await(getStateTransferTimeout());
+ this.flushBlockGate.await(this.getStateTransferTimeout());
}
if (excludeSelf)
{
@@ -974,7 +1005,7 @@
rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
}
- return processResponseList(rsp, trace);
+ return this.processResponseList(rsp, trace);
}
/**
@@ -992,7 +1023,7 @@
public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
Object[] args, Class[] types,boolean excludeSelf) throws Exception
{
- return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout());
+ return this.callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, this.getMethodCallTimeout());
}
/**
@@ -1022,17 +1053,17 @@
}
// the first cluster view member is the coordinator
- Vector coordinatorOnly = new Vector();
+ Vector<Address> coordinatorOnly = new Vector<Address>();
// If we are the coordinator, only call ourself if 'excludeSelf' is false
- if (false == isCurrentNodeCoordinator () ||
+ if (false == this.isCurrentNodeCoordinator () ||
false == excludeSelf)
{
- coordinatorOnly.addElement(this.jgmembers.elementAt (0));
+ coordinatorOnly.addElement(this.jgmembers.elementAt(0));
}
RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
- return processResponseList(rsp, trace);
+ return this.processResponseList(rsp, trace);
}
/**
@@ -1051,8 +1082,10 @@
Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
{
if (!(targetNode instanceof ClusterNodeImpl))
- throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
+ {
+ throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
+ }
boolean trace = this.log.isTraceEnabled();
MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
@@ -1075,17 +1108,25 @@
{
item = response.getValue();
if (!(item instanceof NoHandlerForRPC))
- rc = item;
+ {
+ rc = item;
+ }
}
else if( trace )
- this.log.trace("Ignoring non-received response: "+response);
+ {
+ this.log.trace("Ignoring non-received response: "+response);
+ }
}
else
{
if (!(item instanceof NoHandlerForRPC))
- rc = item;
- else if( trace )
- this.log.trace("Ignoring NoHandlerForRPC");
+ {
+ rc = item;
+ }
+ else if( trace )
+ {
+ this.log.trace("Ignoring NoHandlerForRPC");
+ }
}
}
return rc;
@@ -1108,8 +1149,10 @@
Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
{
if (!(targetNode instanceof ClusterNodeImpl))
- throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
+ {
+ throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
+ }
boolean trace = this.log.isTraceEnabled();
MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
@@ -1138,17 +1181,25 @@
{
item = response.getValue();
if (!(item instanceof NoHandlerForRPC))
+ {
rtn.add(item);
+ }
}
else if( trace )
+ {
this.log.trace("Ignoring non-received response: "+response);
+ }
}
else
{
if (!(item instanceof NoHandlerForRPC))
+ {
rtn.add(item);
+ }
else if( trace )
+ {
this.log.trace("Ignoring NoHandlerForRPC");
+ }
}
}
@@ -1168,7 +1219,7 @@
if(this.channel.flushSupported())
{
- this.flushBlockGate.await(getStateTransferTimeout());
+ this.flushBlockGate.await(this.getStateTransferTimeout());
}
if (excludeSelf)
{
@@ -1177,7 +1228,7 @@
this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
+", methodName="+methodName+", members="+this.jgotherMembers);
}
- this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+ this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
}
else
{
@@ -1186,7 +1237,7 @@
this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
+", methodName="+methodName+", members="+this.members);
}
- this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+ this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
}
}
@@ -1195,7 +1246,7 @@
// State transfer management
// *************************
// *************************
- //
+ //
public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
this.stateHandlers.put(objectName, subscriber);
@@ -1211,10 +1262,10 @@
// Group Membership listeners
// *************************
// *************************
- //
+ //
public void registerMembershipListener(HAMembershipListener listener)
{
- boolean isAsynch = (this.allowSyncListeners == false)
+ boolean isAsynch = (this.allowSyncListeners == false)
|| (listener instanceof AsynchHAMembershipListener)
|| (listener instanceof AsynchHAMembershipExtendedListener);
if( isAsynch ) {
@@ -1222,7 +1273,7 @@
this.asynchListeners.add(listener);
}
}
- else {
+ else {
synchronized(this.synchListeners) {
this.synchListeners.add(listener);
}
@@ -1231,7 +1282,7 @@
public void unregisterMembershipListener(HAMembershipListener listener)
{
- boolean isAsynch = (this.allowSyncListeners == false)
+ boolean isAsynch = (this.allowSyncListeners == false)
|| (listener instanceof AsynchHAMembershipListener)
|| (listener instanceof AsynchHAMembershipExtendedListener);
if( isAsynch ) {
@@ -1239,7 +1290,7 @@
this.asynchListeners.remove(listener);
}
}
- else {
+ else {
synchronized(this.synchListeners) {
this.synchListeners.remove(listener);
}
@@ -1252,7 +1303,7 @@
}
public void setAllowSynchronousMembershipNotifications(boolean allowSync)
- {
+ {
this.allowSyncListeners = allowSync;
}
@@ -1261,7 +1312,7 @@
public void processEvent(Object event)
{
ViewChangeEvent vce = (ViewChangeEvent) event;
- notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
+ this.notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
vce.deadMembers, vce.newMembers, vce.originatingGroups);
}
@@ -1271,13 +1322,15 @@
public void setDistributedStateImpl(DistributedStateImpl distributedState)
{
- this.distributedState = distributedState;
+ this.distributedState = distributedState;
}
public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
{
if (this.replicantManager != null && !(this.replicantManager == drm))
+ {
throw new IllegalStateException("DistributedReplicantManager already set");
+ }
this.replicantManager = drm;
if (this.replicantManager != null)
@@ -1292,7 +1345,7 @@
protected void verifyNodeIsUnique () throws IllegalStateException
{
ClusterNodeImpl matched = null;
- for (ClusterNode member : getClusterNodes())
+ for (ClusterNode member : this.getClusterNodes())
{
if (member.equals(this.me))
{
@@ -1310,7 +1363,7 @@
{
other = (ClusterNodeImpl) member;
}
- throw new IllegalStateException("Found member " + other +
+ throw new IllegalStateException("Found member " + other +
" in current view that duplicates us (" + this.me + "). This" +
" node cannot join partition until duplicate member has" +
" been removed");
@@ -1326,7 +1379,7 @@
* @param classType Class type under which should appear the bound object
* @param ctx Naming context under which we bind the object
* @throws Exception Thrown if a naming exception occurs during binding
- */
+ */
protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
{
// Ah ! This service isn't serializable, so we use a helper class
@@ -1362,12 +1415,18 @@
* @param oldMembers Vector of old members
* @param newMembers Vector of new members
* @return Vector of members that have died between the two views, can be empty.
- */
- protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
+ */
+ protected Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
{
- if(oldMembers == null) oldMembers=new Vector();
- if(newMembers == null) newMembers=new Vector();
- Vector dead=(Vector)oldMembers.clone();
+ if(oldMembers == null)
+ {
+ oldMembers=new Vector<ClusterNode>();
+ }
+ if(newMembers == null)
+ {
+ newMembers=new Vector<ClusterNode>();
+ }
+ Vector<ClusterNode> dead=(Vector)oldMembers.clone();
dead.removeAll(newMembers);
this.log.debug("dead members: " + dead);
return dead;
@@ -1378,19 +1437,25 @@
* @param oldMembers Vector of old members
* @param allMembers Vector of new members
* @return Vector of members that have joined the partition between the two views
- */
- protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
+ */
+ protected Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
{
- if(oldMembers == null) oldMembers=new Vector();
- if(allMembers == null) allMembers=new Vector();
- Vector newMembers=(Vector)allMembers.clone();
+ if(oldMembers == null)
+ {
+ oldMembers=new Vector<ClusterNode>();
+ }
+ if(allMembers == null)
+ {
+ allMembers=new Vector<ClusterNode>();
+ }
+ Vector<ClusterNode> newMembers=(Vector)allMembers.clone();
newMembers.removeAll(oldMembers);
return newMembers;
}
- protected void notifyListeners(ArrayList theListeners, long viewID,
- Vector allMembers, Vector deadMembers, Vector newMembers,
- Vector originatingGroups)
+ protected void notifyListeners(ArrayList<HAMembershipListener> theListeners, long viewID,
+ Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
+ Vector<View> originatingGroups)
{
this.log.debug("Begin notifyListeners, viewID: "+viewID);
synchronized(theListeners)
@@ -1404,7 +1469,7 @@
HAMembershipListener aListener = null;
try
{
- aListener = (HAMembershipListener) theListeners.get(i);
+ aListener = theListeners.get(i);
if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
{
HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
@@ -1446,10 +1511,7 @@
{
return this.bindIntoJndi;
}
-
-
-
public ThreadPool getThreadPool()
{
return this.threadPool;
@@ -1460,16 +1522,17 @@
this.threadPool = threadPool;
}
- protected Vector translateAddresses (Vector jgAddresses)
+ protected Vector<ClusterNode> translateAddresses(Vector<Address> addresses)
{
- if (jgAddresses == null)
+ if (addresses == null)
+ {
return null;
+ }
- Vector result = new Vector (jgAddresses.size());
- for (int i = 0; i < jgAddresses.size(); i++)
+ Vector<ClusterNode> result = new Vector<ClusterNode>(addresses.size());
+ for (Address address: addresses)
{
- IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
- result.add(new ClusterNodeImpl(addr));
+ result.add(new ClusterNodeImpl((IpAddress) address));
}
return result;
@@ -1489,10 +1552,10 @@
public String showHistory ()
{
StringBuffer buff = new StringBuffer();
- Vector data = new Vector (this.history);
- for (java.util.Iterator row = data.iterator(); row.hasNext();)
+ Vector<String> data = new Vector<String>(this.history);
+ for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
{
- String info = (String) row.next();
+ String info = row.next();
buff.append(info).append("\n");
}
return buff.toString();
@@ -1502,11 +1565,11 @@
{
StringBuffer buff = new StringBuffer();
buff.append("<events>\n");
- Vector data = new Vector (this.history);
- for (java.util.Iterator row = data.iterator(); row.hasNext();)
+ Vector<String> data = new Vector<String>(this.history);
+ for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
{
buff.append(" <event>\n ");
- String info = (String) row.next();
+ String info = row.next();
buff.append(info);
buff.append("\n </event>\n");
}
@@ -1594,7 +1657,7 @@
public void setMethodCallTimeout(long timeout)
{
- this.method_call_timeout = timeout;
+ this.method_call_timeout = timeout;
}
// Protected --------------------------------------------------------------
@@ -1604,8 +1667,10 @@
*/
protected Object objectFromByteBufferInternal (byte[] buffer) throws Exception
{
- if(buffer == null)
+ if(buffer == null)
+ {
return null;
+ }
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
@@ -1630,11 +1695,15 @@
*/
protected Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
{
- if(buffer == null)
+ if(buffer == null)
+ {
return null;
+ }
if (buffer[0] == NULL_VALUE)
+ {
return null;
+ }
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
// read past the null/serializable byte
@@ -1650,7 +1719,9 @@
protected byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
{
if (obj == null)
+ {
return new byte[]{NULL_VALUE};
+ }
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// write a marker to stream to distinguish from null value stream
@@ -1671,12 +1742,12 @@
public void getState(OutputStream stream)
{
- logHistory ("getState called on partition");
+ ClusterPartition.this.logHistory ("getState called on partition");
ClusterPartition.this.log.debug("getState called.");
try
{
- getStateInternal(stream);
+ ClusterPartition.this.getStateInternal(stream);
}
catch (Exception ex)
{
@@ -1697,7 +1768,7 @@
public void setState(InputStream stream)
{
- logHistory ("setState called on partition");
+ ClusterPartition.this.logHistory ("setState called on partition");
try
{
if (stream == null)
@@ -1706,31 +1777,31 @@
}
else
{
- setStateInternal(stream);
+ ClusterPartition.this.setStateInternal(stream);
}
ClusterPartition.this.isStateSet = true;
}
catch (Throwable t)
{
- recordSetStateFailure(t);
+ ClusterPartition.this.recordSetStateFailure(t);
}
finally
{
// Notify waiting thread that serviceState has been set.
- notifyChannelLock();
+ ClusterPartition.this.notifyChannelLock();
}
}
public byte[] getState()
{
- logHistory ("getState called on partition");
+ ClusterPartition.this.logHistory ("getState called on partition");
ClusterPartition.this.log.debug("getState called.");
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
- getStateInternal(baos);
+ ClusterPartition.this.getStateInternal(baos);
return baos.toByteArray();
}
catch (Exception ex)
@@ -1755,7 +1826,7 @@
public void setState(byte[] obj)
{
- logHistory ("setState called on partition");
+ ClusterPartition.this.logHistory ("setState called on partition");
try
{
if (obj == null)
@@ -1765,7 +1836,7 @@
else
{
ByteArrayInputStream bais = new ByteArrayInputStream(obj);
- setStateInternal(bais);
+ ClusterPartition.this.setStateInternal(bais);
bais.close();
}
@@ -1773,28 +1844,28 @@
}
catch (Throwable t)
{
- recordSetStateFailure(t);
+ ClusterPartition.this.recordSetStateFailure(t);
}
finally
{
// Notify waiting thread that serviceState has been set.
- notifyChannelLock();
+ ClusterPartition.this.notifyChannelLock();
}
}
}
- /**
+ /**
* A simple data class containing the view change event needed to
* notify the HAMembershipListeners
*/
private static class ViewChangeEvent
{
long viewId;
- Vector deadMembers;
- Vector newMembers;
- Vector allMembers;
- Vector originatingGroups;
+ Vector<ClusterNode> deadMembers;
+ Vector<ClusterNode> newMembers;
+ Vector<ClusterNode> allMembers;
+ Vector<View> originatingGroups;
}
private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
@@ -1802,7 +1873,7 @@
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
- return objectFromByteBufferInternal(buf);
+ return ClusterPartition.this.objectFromByteBufferInternal(buf);
}
public byte[] objectToByteBuffer(Object obj) throws Exception
@@ -1813,11 +1884,11 @@
String name = ((MethodCall)obj).getName();
int idx = name.lastIndexOf('.');
String serviceName = name.substring(0, idx);
- return objectToByteBufferInternal(new Object[]{serviceName, objectToByteBufferInternal(obj)});
+ return ClusterPartition.this.objectToByteBufferInternal(new Object[]{serviceName, ClusterPartition.this.objectToByteBufferInternal(obj)});
}
- else // this shouldn't occur
- return objectToByteBufferInternal(obj);
- }
+
+ return ClusterPartition.this.objectToByteBufferInternal(obj);
+ }
}
private class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
@@ -1826,7 +1897,7 @@
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
boolean trace = ClusterPartition.this.log.isTraceEnabled();
- Object retval = objectFromByteBufferResponseInternal(buf);
+ Object retval = ClusterPartition.this.objectFromByteBufferResponseInternal(buf);
// HAServiceResponse is only received when a scoped classloader is required for unmarshalling
if (!(retval instanceof HAServiceResponse))
{
@@ -1834,7 +1905,7 @@
}
String serviceName = ((HAServiceResponse)retval).getServiceName();
- byte[] payload = ((HAServiceResponse)retval).getPayload();
+ byte[] payload = ((HAServiceResponse)retval).getPayload();
ClassLoader previousCL = null;
boolean overrideCL = false;
@@ -1846,11 +1917,13 @@
previousCL = Thread.currentThread().getContextClassLoader();
ClassLoader loader = weak.get();
if( trace )
- ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
+ {
+ ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
+ }
overrideCL = true;
Thread.currentThread().setContextClassLoader(loader);
}
- retval = objectFromByteBufferResponseInternal(payload);
+ retval = ClusterPartition.this.objectFromByteBufferResponseInternal(payload);
return retval;
}
@@ -1866,16 +1939,16 @@
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- return objectToByteBufferResponseInternal(obj);
- }
+ return ClusterPartition.this.objectToByteBufferResponseInternal(obj);
+ }
}
/**
- * Overrides RpcDispatcher.Handle so that we can dispatch to many
+ * Overrides RpcDispatcher.Handle so that we can dispatch to many
* different objects.
*/
private class RpcHandler extends RpcDispatcher
- {
+ {
private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
boolean deadlock_detection)
{
@@ -1883,8 +1956,8 @@
}
/**
- * Analyze the MethodCall contained in <code>req</code> to find the
- * registered service object to invoke against, and then execute it
+ * Analyze the MethodCall contained in <code>req</code> to find the
+ * registered service object to invoke against, and then execute it
* against *that* object and return result.
*
* This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
@@ -1903,19 +1976,21 @@
byte[] request_bytes = null;
if( trace )
- this.log.trace("Partition " + getPartitionName() + " received msg");
+ {
+ this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " received msg");
+ }
if(req == null || req.getBuffer() == null)
{
- this.log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
+ this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message or message buffer is null!");
return null;
}
try
{
- Object wrapper = objectFromByteBufferInternal(req.getBuffer());
+ Object wrapper = ClusterPartition.this.objectFromByteBufferInternal(req.getBuffer());
if(wrapper == null || !(wrapper instanceof Object[]))
{
- this.log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
+ this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message wrapper does not contain Object[] object!");
return null;
}
@@ -1929,34 +2004,38 @@
if (handler == null)
{
if( trace )
- this.log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
+ {
+ this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " no rpc handler registered under service " + service);
+ }
return new NoHandlerForRPC();
}
}
catch(Exception e)
{
- this.log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
+ this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
return null;
}
try
- {
+ {
// If client registered the service with a classloader, override the thread classloader here
WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
if (weak != null)
{
if( trace )
+ {
this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
+ }
previousCL = Thread.currentThread().getContextClassLoader();
ClassLoader loader = weak.get();
overrideCL = true;
Thread.currentThread().setContextClassLoader(loader);
}
- body = objectFromByteBufferInternal(request_bytes);
+ body = ClusterPartition.this.objectFromByteBufferInternal(request_bytes);
}
catch (Exception e)
{
- this.log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
+ this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed extracting message body from request bytes", e);
return null;
}
finally
@@ -1970,21 +2049,23 @@
if(body == null || !(body instanceof MethodCall))
{
- this.log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
+ this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message does not contain a MethodCall object!");
return null;
}
// get method call information
MethodCall method_call = (MethodCall)body;
- String methodName = method_call.getName();
+ String methodName = method_call.getName();
if( trace )
+ {
this.log.trace("full methodName: " + methodName);
+ }
int idx = methodName.lastIndexOf('.');
String handlerName = methodName.substring(0, idx);
String newMethodName = methodName.substring(idx + 1);
- if( trace )
+ if( trace )
{
this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
this.log.trace("Handle: " + methodName);
@@ -2003,16 +2084,20 @@
if (overrideCL)
{
// wrap the response so that the service name can be accessed during unmarshalling of the response
- byte[] retbytes = objectToByteBufferResponseInternal(retval);
+ byte[] retbytes = ClusterPartition.this.objectToByteBufferResponseInternal(retval);
retval = new HAServiceResponse(handlerName, retbytes);
}
if( trace )
+ {
this.log.trace("rpc call return value: " + retval);
+ }
}
catch (Throwable t)
{
if( trace )
- this.log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
+ {
+ this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " rpc call threw exception", t);
+ }
retval = t;
}
@@ -2037,32 +2122,36 @@
private int generation;
- public synchronized void close()
+ public synchronized void close()
{
this.isOpen = false;
}
- public synchronized void open()
+ public synchronized void open()
{
++this.generation;
this.isOpen = true;
- notifyAll();
+ this.notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
- public synchronized void await() throws InterruptedException
+ public synchronized void await() throws InterruptedException
{
int arrivalGeneration = this.generation;
while(!this.isOpen && arrivalGeneration == this.generation)
- wait();
+ {
+ this.wait();
+ }
}
// BLOCKS-UNTIL: opened-since(generation on entry)
- public synchronized void await(long timeout) throws InterruptedException
+ public synchronized void await(long timeout) throws InterruptedException
{
int arrivalGeneration = this.generation;
while(!this.isOpen && arrivalGeneration == this.generation)
- wait(timeout);
+ {
+ this.wait(timeout);
+ }
}
}
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2008-06-16 20:53:23 UTC (rev 74642)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2008-06-16 21:07:51 UTC (rev 74643)
@@ -21,38 +21,39 @@
*/
package org.jboss.ha.framework.server;
-import java.util.Set;
-import java.util.Vector;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
-import java.io.Serializable;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-import org.jboss.logging.Logger;
-
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.logging.Logger;
-/**
+/**
* This class manages replicated objects.
*
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
* @author Scott.stark at jboss.org
* @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ * @author <a href="mailto:pferraro at redhat.com">Paul Ferraro</a>
* @version $Revision$
*/
public class DistributedReplicantManagerImpl
@@ -63,36 +64,38 @@
{
// Constants -----------------------------------------------------
- protected final static String SERVICE_NAME = "DistributedReplicantManager";
+ static final String SERVICE_NAME = "DistributedReplicantManager";
+ private static final Class<?>[] add_types = new Class<?>[] { String.class, String.class, Serializable.class };
+ private static final Class<?>[] remove_types = new Class<?>[] { String.class, String.class };
+
// Attributes ----------------------------------------------------
- protected static int threadID;
+ private static AtomicInteger threadID = new AtomicInteger();
- protected Map localReplicants = new ConcurrentReaderHashMap();
- protected Map replicants = new ConcurrentReaderHashMap();
- protected Map keyListeners = new ConcurrentReaderHashMap();
- protected Map intraviewIdCache = new HashMap();
- protected HAPartition partition;
+ private ConcurrentMap<String, Serializable> localReplicants = new ConcurrentHashMap<String, Serializable>();
+ private ConcurrentMap<String, ConcurrentMap<String, Serializable>> replicants = new ConcurrentHashMap<String, ConcurrentMap<String, Serializable>>();
+ private ConcurrentMap<String, List<ReplicantListener>> keyListeners = new ConcurrentHashMap<String, List<ReplicantListener>>();
+ private Map<String, Integer> intraviewIdCache = new ConcurrentHashMap<String, Integer>();
+
+ private HAPartition partition;
/** The handler used to send replicant change notifications asynchronously */
- protected AsynchEventHandler asynchHandler;
+ private AsynchEventHandler asynchHandler;
- protected Logger log;
+ private Logger log = Logger.getLogger(this.getClass());
+ private boolean trace = this.log.isTraceEnabled();
- protected String nodeName = null;
+ private String nodeName = null;
- protected Latch partitionNameKnown = new Latch ();
- protected boolean trace;
+ // Works like a simple latch
+ private CountDownLatch partitionNameKnown = new CountDownLatch(1);
- protected Class[] add_types=new Class[]{String.class, String.class, Serializable.class};
- protected Class[] remove_types=new Class[]{String.class, String.class};
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DistributedReplicantManagerImpl()
+ public DistributedReplicantManagerImpl()
{
super();
- this.log = Logger.getLogger (this.getClass ());
+
// JBAS-5068 Create the handler early so we don't risk NPEs
this.asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
}
@@ -101,24 +104,26 @@
public void createService() throws Exception
{
- if (partition == null)
+ if (this.partition == null)
+ {
throw new IllegalStateException("HAPartition property must be set before creating DistributedReplicantManager service");
+ }
- log.debug("registerRPCHandler");
- partition.registerRPCHandler(SERVICE_NAME, this);
- log.debug("subscribeToStateTransferEvents");
- partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
- log.debug("registerMembershipListener");
- partition.registerMembershipListener(this);
+ this.log.debug("registerRPCHandler");
+ this.partition.registerRPCHandler(SERVICE_NAME, this);
+ this.log.debug("subscribeToStateTransferEvents");
+ this.partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
+ this.log.debug("registerMembershipListener");
+ this.partition.registerMembershipListener(this);
}
public void startService() throws Exception
{
this.nodeName = this.partition.getNodeName();
- asynchHandler.start();
+ this.asynchHandler.start();
- partitionNameKnown.release (); // partition name is now known!
+ this.partitionNameKnown.countDown(); // partition name is now known!
//log.info("mergemembers");
//mergeMembers();
@@ -129,11 +134,11 @@
// Stop the asynch handler thread
try
{
- asynchHandler.stop();
+ this.asynchHandler.stop();
}
catch( Exception e)
{
- log.warn("Failed to stop asynchHandler", e);
+ this.log.warn("Failed to stop asynchHandler", e);
}
// TODO reset the latch
@@ -141,48 +146,39 @@
// NR 200505 : [JBCLUSTER-38] unbind at destroy
public void destroyService() throws Exception
- {
+ {
// we cleanly shutdown. This should be optimized.
- if (localReplicants != null)
+ for (String key: this.localReplicants.keySet())
{
- synchronized(localReplicants)
- {
- String[] keys = new String[localReplicants.size()];
- localReplicants.keySet().toArray(keys);
- for(int n = 0; n < keys.length; n ++)
- {
- this.removeLocal(keys[n]); // channel is disconnected, so
- // don't try to notify cluster
- }
- }
+ this.removeLocal(key); // channel is disconnected, so don't try to notify cluster
}
- if (partition != null)
- {
- partition.unregisterRPCHandler(SERVICE_NAME, this);
- partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
- partition.unregisterMembershipListener(this);
+ if (this.partition != null)
+ {
+ this.partition.unregisterRPCHandler(SERVICE_NAME, this);
+ this.partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
+ this.partition.unregisterMembershipListener(this);
}
}
public void registerWithJmx(MBeanServer server) throws Exception
{
- server.registerMBean(this, getObjectName());
+ server.registerMBean(this, this.getObjectName());
}
public void unregisterWithJmx(MBeanServer server) throws Exception
{
- server.unregisterMBean(getObjectName());
+ server.unregisterMBean(this.getObjectName());
}
private ObjectName getObjectName() throws Exception
{
- return new ObjectName("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName());
+ return new ObjectName("jboss:service=" + SERVICE_NAME + ",partition=" + this.partition.getPartitionName());
}
public HAPartition getHAPartition()
{
- return partition;
+ return this.partition;
}
public void setHAPartition(HAPartition clusterPartition)
@@ -190,38 +186,38 @@
this.partition = clusterPartition;
}
- public String listContent () throws Exception
+ public String listContent() throws Exception
{
+ StringBuilder result = new StringBuilder();
+
+ result.append("<pre>");
+
// we merge all replicants services: local only or not
//
- java.util.Collection services = this.getAllServices ();
-
- StringBuffer result = new StringBuffer ();
- java.util.Iterator catsIter = services.iterator ();
-
- result.append ("<pre>");
-
- while (catsIter.hasNext ())
+ for (String category: this.getAllServices())
{
- String category = (String)catsIter.next ();
- Map content = (Map)this.replicants.get (category);
- if (content == null)
- content = new HashMap ();
- java.util.Iterator keysIter = content.keySet ().iterator ();
-
- result.append ("-----------------------------------------------\n");
- result.append ("Service : ").append (category).append ("\n\n");
+ result.append("-----------------------------------------------\n");
+ result.append("Service : ").append(category).append("\n\n");
- Serializable local = lookupLocalReplicant(category);
+ Serializable local = this.localReplicants.get(category);
+
if (local == null)
- result.append ("\t- Service is *not* available locally\n");
+ {
+ result.append("\t- Service is *not* available locally\n");
+ }
else
- result.append ("\t- Service *is* also available locally\n");
+ {
+ result.append("\t- Service *is* also available locally\n");
+ }
- while (keysIter.hasNext ())
+ Map<String, Serializable> content = this.replicants.get(category);
+
+ if (content != null)
{
- String location = (String)keysIter.next ();
- result.append ("\t- ").append(location).append ("\n");
+ for (String location: content.keySet())
+ {
+ result.append("\t- ").append(location).append("\n");
+ }
}
result.append ("\n");
@@ -230,110 +226,106 @@
result.append ("</pre>");
- return result.toString ();
+ return result.toString();
}
- public String listXmlContent () throws Exception
+ public String listXmlContent() throws Exception
{
- // we merge all replicants services: local only or not
- //
- java.util.Collection services = this.getAllServices ();
- StringBuffer result = new StringBuffer ();
-
+ StringBuilder result = new StringBuilder();
+
result.append ("<ReplicantManager>\n");
- java.util.Iterator catsIter = services.iterator ();
- while (catsIter.hasNext ())
+ // we merge all replicants services: local only or not
+ //
+ for (String category: this.getAllServices())
{
- String category = (String)catsIter.next ();
- Map content = (Map)this.replicants.get (category);
- if (content == null)
- content = new HashMap ();
- java.util.Iterator keysIter = content.keySet ().iterator ();
-
- result.append ("\t<Service>\n");
- result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
+ result.append("\t<Service>\n");
+ result.append("\t\t<ServiceName>").append(category).append("</ServiceName>\n");
+ Serializable local = this.localReplicants.get(category);
- Serializable local = lookupLocalReplicant(category);
if (local != null)
{
- result.append ("\t\t<Location>\n");
- result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
- result.append ("\t\t</Location>\n");
+ result.append("\t\t<Location>\n");
+ result.append("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
+ result.append("\t\t</Location>\n");
}
- while (keysIter.hasNext ())
+ Map<String, Serializable> content = this.replicants.get(category);
+
+ if (content != null)
{
- String location = (String)keysIter.next ();
- result.append ("\t\t<Location>\n");
- result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
- result.append ("\t\t</Location>\n");
+ for (String location: content.keySet())
+ {
+ result.append("\t\t<Location>\n");
+ result.append("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
+ result.append("\t\t</Location>\n");
+ }
}
- result.append ("\t<Service>\n");
-
+ result.append("\t</Service>\n");
}
- result.append ("<ReplicantManager>\n");
+ result.append("</ReplicantManager>\n");
- return result.toString ();
+ return result.toString();
}
// HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
- public Serializable getCurrentState ()
+ public Serializable getCurrentState()
{
- java.util.Collection services = this.getAllServices ();
- HashMap result = new HashMap ();
+ Map<String, ConcurrentMap<String, Serializable>> result = new HashMap<String, ConcurrentMap<String, Serializable>>();
- java.util.Iterator catsIter = services.iterator ();
- while (catsIter.hasNext ())
+ for (String category: this.getAllServices())
{
- String category = (String)catsIter.next ();
- Map content = (Map)this.replicants.get (category);
- if (content == null)
- content = new HashMap();
- else
+ ConcurrentMap<String, Serializable> map = new ConcurrentHashMap<String, Serializable>();
+
+ ConcurrentMap<String, Serializable> content = this.replicants.get(category);
+
+ if (content != null)
{
- Map temp = new HashMap();
- temp.putAll(content);
- content = temp;
+ map.putAll(content);
}
- Serializable local = lookupLocalReplicant(category);
+ Serializable local = this.localReplicants.get(category);
+
if (local != null)
- content.put (this.nodeName, local);
+ {
+ map.put(this.nodeName, local);
+ }
- result.put (category, content);
+ result.put(category, map);
}
// we add the intraviewid cache to the global result
//
- Object[] globalResult = new Object[] {result, intraviewIdCache};
- return globalResult;
+ return new Object[] { result, this.intraviewIdCache };
}
+ @SuppressWarnings("unchecked")
public void setCurrentState(Serializable newState)
{
- Object[] globalState = (Object[])newState;
- Map map = (Map)globalState[0];
+ Object[] globalState = (Object[]) newState;
+ Map<String, ConcurrentMap<String, Serializable>> map = (Map) globalState[0];
+
this.replicants.putAll(map);
- this.intraviewIdCache = (Map)globalState[1];
+
+ this.intraviewIdCache = (Map) globalState[1];
- if( trace )
+ if (this.trace)
{
- log.trace(nodeName + ": received new state, will republish local replicants");
+ this.log.trace(this.nodeName + ": received new state, will republish local replicants");
}
- MembersPublisher publisher = new MembersPublisher();
- publisher.start();
+
+ new MembersPublisher().start();
}
- public Collection getAllServices ()
+ public Collection<String> getAllServices()
{
- HashSet services = new HashSet();
- services.addAll (localReplicants.keySet ());
- services.addAll (replicants.keySet ());
+ Set<String> services = new HashSet<String>();
+ services.addAll(this.localReplicants.keySet());
+ services.addAll(this.replicants.keySet());
return services;
}
@@ -344,11 +336,11 @@
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
// and then notify all listening nodes.
//
- log.info("Merging partitions...");
- log.info("Dead members: " + deadMembers.size());
- log.info("Originating groups: " + originatingGroups);
- purgeDeadMembers(deadMembers, true);
- if (newMembers.size() > 0)
+ this.log.info("Merging partitions...");
+ this.log.info("Dead members: " + deadMembers.size());
+ this.log.info("Originating groups: " + originatingGroups);
+ this.purgeDeadMembers(deadMembers, true);
+ if (newMembers.size() > 0)
{
new MergeMembers().start();
}
@@ -359,11 +351,11 @@
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
// and then notify all listening nodes.
//
- log.info("I am (" + nodeName + ") received membershipChanged event:");
- log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
- log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
- log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
- purgeDeadMembers(deadMembers, false);
+ this.log.info("I am (" + this.nodeName + ") received membershipChanged event:");
+ this.log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
+ this.log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
+ this.log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
+ this.purgeDeadMembers(deadMembers, false);
// we don't need to merge members anymore
}
@@ -373,263 +365,297 @@
public void processEvent(Object event)
{
KeyChangeEvent kce = (KeyChangeEvent) event;
- notifyKeyListeners(kce.key, kce.replicants, kce.merge);
+ this.notifyKeyListeners(kce.key, kce.replicants, kce.merge);
}
static class KeyChangeEvent
{
String key;
- List replicants;
+ List<Serializable> replicants;
boolean merge;
}
- // DistributedReplicantManager implementation ----------------------------------------------
+ // DistributedReplicantManager implementation ----------------------------------------------
public void add(String key, Serializable replicant) throws Exception
{
- if( trace )
- log.trace("add, key="+key+", value="+replicant);
- partitionNameKnown.acquire (); // we don't propagate until our name is known
+ if (this.trace)
+ {
+ this.log.trace("add, key=" + key + ", value=" + replicant);
+ }
- Object[] args = {key, this.nodeName, replicant};
- partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
- synchronized(localReplicants)
+ this.partitionNameKnown.await(); // we don't propagate until our name is known
+
+ Object[] args = { key, this.nodeName, replicant };
+
+ this.partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+
+ List<Serializable> replicants = null;
+
+ synchronized (this.localReplicants)
{
- localReplicants.put(key, replicant);
- notifyKeyListeners(key, lookupReplicants(key), false);
+ this.localReplicants.put(key, replicant);
+
+ replicants = this.getReplicants(key);
}
+
+ this.notifyKeyListeners(key, replicants, false);
}
public void remove(String key) throws Exception
- {
- partitionNameKnown.acquire (); // we don't propagate until our name is known
+ {
+ this.partitionNameKnown.await(); // we don't propagate until our name is known
// optimisation: we don't make a costly network call
// if there is nothing to remove
- if (localReplicants.containsKey(key))
+ if (this.localReplicants.containsKey(key))
{
- Object[] args = {key, this.nodeName};
- partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
- removeLocal(key);
+ Object[] args = { key, this.nodeName };
+
+ this.partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
+
+ this.removeLocal(key);
}
}
private void removeLocal(String key)
{
- synchronized(localReplicants)
+ List<Serializable> replicants = null;
+
+ synchronized (this.localReplicants)
{
- localReplicants.remove(key);
- List result = lookupReplicants(key);
- if (result == null)
- result = new ArrayList (); // don't pass null but an empty list
- notifyKeyListeners(key, result, false);
+ if (this.localReplicants.remove(key) != null)
+ {
+ replicants = this.getReplicants(key);
+ }
}
+
+ if (replicants != null)
+ {
+ this.notifyKeyListeners(key, replicants, false);
+ }
}
public Serializable lookupLocalReplicant(String key)
{
- return (Serializable)localReplicants.get(key);
+ return this.localReplicants.get(key);
}
- public List lookupReplicants(String key)
+ public List<Serializable> lookupReplicants(String key)
{
- Serializable local = lookupLocalReplicant(key);
- Map replicant = (Map)replicants.get(key);
- if (replicant == null && local == null)
- return null;
+ Serializable local = this.localReplicants.get(key);
+
+ Map<String, Serializable> replicant = this.replicants.get(key);
- ArrayList rtn = new ArrayList();
-
if (replicant == null)
{
- if (local != null)
- rtn.add(local);
+ return (local != null) ? Collections.singletonList(local) : null;
}
- else
+
+ // JBAS-2677. Put the replicants in view order.
+ ClusterNode[] nodes = this.partition.getClusterNodes();
+
+ List<Serializable> result = new ArrayList<Serializable>(nodes.length);
+
+ for (ClusterNode node: nodes)
{
- // JBAS-2677. Put the replicants in view order.
- ClusterNode[] nodes = partition.getClusterNodes();
- String replNode;
- Object replVal;
- for (int i = 0; i < nodes.length; i++)
+ String name = node.getName();
+
+ if (local != null && this.nodeName.equals(name))
{
- replNode = nodes[i].getName();
- if (local != null && nodeName.equals(replNode))
+ result.add(local);
+ }
+ else
+ {
+ Serializable value = replicant.get(name);
+
+ if (value != null)
{
- rtn.add(local);
- continue;
+ result.add(value);
}
-
- replVal = replicant.get(replNode);
- if (replVal != null)
- rtn.add(replVal);
}
}
- return rtn;
+ return result;
}
- public List lookupReplicantsNodeNames(String key)
- {
- List<ClusterNode> nodes = lookupReplicantsNodes(key);
- if (nodes == null)
+ private List<Serializable> getReplicants(String key)
+ {
+ List<Serializable> result = this.lookupReplicants(key);
+
+ if (result == null)
{
- return null;
+ result = Collections.emptyList();
}
+ return result;
+ }
+
+ @Deprecated
+ public List<String> lookupReplicantsNodeNames(String key)
+ {
+ List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
+
+ if (nodes == null) return null;
+
List<String> nodeNames = new ArrayList<String>(nodes.size());
+
for (ClusterNode node : nodes)
{
nodeNames.add(node.getName());
- }
+ }
return nodeNames;
}
public List<ClusterNode> lookupReplicantsNodes(String key)
{
- boolean locallyReplicated = localReplicants.containsKey (key);
- Map replicant = (Map)replicants.get(key);
- if (replicant == null && !locallyReplicated)
- return null;
-
- List<ClusterNode> rtn = new ArrayList<ClusterNode>();
+ boolean local = this.localReplicants.containsKey(key);
+ Map<String, Serializable> replicant = this.replicants.get(key);
if (replicant == null)
- {
- if (locallyReplicated)
- rtn.add(partition.getClusterNode());
+ {
+ return local ? Collections.singletonList(this.partition.getClusterNode()) : null;
}
- else
+
+ Set<String> keys = replicant.keySet();
+ ClusterNode[] nodes = this.partition.getClusterNodes();
+ List<ClusterNode> rtn = new ArrayList<ClusterNode>(nodes.length);
+
+ for (ClusterNode node : nodes)
{
- Set keys = replicant.keySet();
- ClusterNode[] nodes = partition.getClusterNodes();
- String keyOwner;
- for (int i = 0; i < nodes.length; i++)
+ String name = node.getName();
+
+ if (local && this.nodeName.equals(name))
{
- keyOwner = nodes[i].getName();
- if (locallyReplicated && nodeName.equals(keyOwner))
- {
- rtn.add(partition.getClusterNode());
- continue;
- }
-
- if (keys.contains(keyOwner))
- rtn.add(nodes[i]);
+ rtn.add(this.partition.getClusterNode());
}
+ else if (keys.contains(name))
+ {
+ rtn.add(node);
+ }
}
return rtn;
- }
+ }
- public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+ public void registerListener(String key, ReplicantListener subscriber)
{
- synchronized(keyListeners)
- {
- ArrayList listeners = (ArrayList)keyListeners.get(key);
- if (listeners == null)
- {
- listeners = new ArrayList();
- keyListeners.put(key, listeners);
- }
- listeners.add(subscriber);
- }
+ List<ReplicantListener> list = new CopyOnWriteArrayList<ReplicantListener>();
+
+ List<ReplicantListener> existing = this.keyListeners.putIfAbsent(key, list);
+
+ ((existing != null) ? existing : list).add(subscriber);
}
public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
{
- synchronized(keyListeners)
+ List<ReplicantListener> listeners = this.keyListeners.get(key);
+
+ if (listeners != null)
{
- ArrayList listeners = (ArrayList)keyListeners.get (key);
- if (listeners == null) return;
-
listeners.remove(subscriber);
- if (listeners.size() == 0)
- keyListeners.remove(key);
-
+
+ this.keyListeners.remove(key, Collections.emptyList());
}
}
- public int getReplicantsViewId(String key)
+ public int getReplicantsViewId(String key)
{
- Integer result = (Integer)this.intraviewIdCache.get (key);
+ Integer result = this.intraviewIdCache.get(key);
- if (result == null)
- return 0;
- else
- return result.intValue ();
+ return (result != null) ? result.intValue() : 0;
}
- public boolean isMasterReplica (String key)
+ public boolean isMasterReplica(String key)
{
- if( trace )
- log.trace("isMasterReplica, key="+key);
- // if I am not a replicat, I cannot be the master...
+ if (this.trace)
+ {
+ this.log.trace("isMasterReplica, key=" + key);
+ }
+ // if I am not a replicant, I cannot be the master...
//
- if (!localReplicants.containsKey (key))
+ if (!this.localReplicants.containsKey(key))
{
- if( trace )
- log.trace("no localReplicants, key="+key+", isMasterReplica=false");
+ if (this.trace)
+ {
+ this.log.trace("no localReplicants, key=" + key + ", isMasterReplica=false");
+ }
return false;
}
- Vector allNodes = this.partition.getCurrentView ();
- Map repForKey = (Map)replicants.get(key);
- if (repForKey==null)
+ Map<String, Serializable> repForKey = this.replicants.get(key);
+ if (repForKey == null)
{
- if( trace )
- log.trace("no replicants, key="+key+", isMasterReplica=true");
+ if (this.trace)
+ {
+ this.log.trace("no replicants, key=" + key + ", isMasterReplica=true");
+ }
return true;
}
- Vector replicaNodes = new Vector ((repForKey).keySet ());
- boolean isMasterReplica = false;
- for (int i=0; i<allNodes.size (); i++)
+
+ Vector<String> allNodes = this.partition.getCurrentView();
+ for (String node: allNodes)
{
- String aMember = (String)allNodes.elementAt (i);
- if( trace )
- log.trace("Testing member: "+aMember);
- if (replicaNodes.contains (aMember))
+ if (this.trace)
{
- if( trace )
- log.trace("Member found in replicaNodes, isMasterReplica=false");
- break;
+ this.log.trace("Testing member: " + node);
}
- else if (aMember.equals (this.nodeName))
+
+ if (repForKey.containsKey(node))
{
- if( trace )
- log.trace("Member == nodeName, isMasterReplica=true");
- isMasterReplica = true;
- break;
+ if (this.trace)
+ {
+ this.log.trace("Member found in replicaNodes, isMasterReplica=false");
+ }
+ return false;
}
+ else if (node.equals(this.nodeName))
+ {
+ if (this.trace)
+ {
+ this.log.trace("Member == nodeName, isMasterReplica=true");
+ }
+ return true;
+ }
}
- return isMasterReplica;
+ return false;
}
- // DistributedReplicantManager cluster callbacks ----------------------------------------------
+ // DistributedReplicantManager cluster callbacks ----------------------------------------------
/**
* Cluster callback called when a new replicant is added on another node
* @param key Replicant key
* @param nodeName Node that add the current replicant
* @param replicant Serialized representation of the replicant
- */
+ */
public void _add(String key, String nodeName, Serializable replicant)
{
- if( trace )
- log.trace("_add(" + key + ", " + nodeName);
+ if (this.trace)
+ {
+ this.log.trace("_add(" + key + ", " + nodeName);
+ }
+ KeyChangeEvent event = new KeyChangeEvent();
+ event.key = key;
+
+ synchronized (this.replicants)
+ {
+ this.addReplicant(key, nodeName, replicant);
+
+ event.replicants = this.getReplicants(key);
+ }
+
try
{
- addReplicant(key, nodeName, replicant);
- // Notify listeners asynchronously
- KeyChangeEvent kce = new KeyChangeEvent();
- kce.key = key;
- kce.replicants = lookupReplicants(key);
- asynchHandler.queueEvent(kce);
+ this.asynchHandler.queueEvent(event);
}
- catch (Exception ex)
+ catch (InterruptedException e)
{
- log.error("_add failed", ex);
+ Thread.currentThread().interrupt();
+
+ this.log.error("_add failed", e);
}
}
@@ -637,42 +663,50 @@
* Cluster callback called when a replicant is removed by another node
* @param key Name of the replicant key
* @param nodeName Node that wants to remove its replicant for the give key
- */
+ */
public void _remove(String key, String nodeName)
{
- try
+ KeyChangeEvent event = new KeyChangeEvent();
+ event.key = key;
+
+ synchronized (this.replicants)
{
- if (removeReplicant (key, nodeName)) {
- // Notify listeners asynchronously
- KeyChangeEvent kce = new KeyChangeEvent();
- kce.key = key;
- kce.replicants = lookupReplicants(key);
- asynchHandler.queueEvent(kce);
+ if (this.removeReplicant(key, nodeName))
+ {
+ event.replicants = this.getReplicants(key);
}
}
- catch (Exception ex)
+
+ if (event.replicants != null)
{
- log.error("_remove failed", ex);
+ try
+ {
+ this.asynchHandler.queueEvent(event);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+
+ this.log.error("_remove failed", e);
+ }
}
}
- protected boolean removeReplicant (String key, String nodeName) throws Exception
+ protected boolean removeReplicant(String key, String nodeName)
{
- synchronized(replicants)
+ Map<String, Serializable> replicant = this.replicants.get(key);
+
+ if (replicant != null)
{
- Map replicant = (Map)replicants.get(key);
- if (replicant == null) return false;
- Object removed = replicant.remove(nodeName);
- if (removed != null)
+ if (replicant.remove(nodeName) != null)
{
- Collection values = replicant.values();
- if (values.size() == 0)
- {
- replicants.remove(key);
- }
+ // If replicant map is empty, prune it
+ this.replicants.remove(key, Collections.emptyMap());
+
return true;
}
}
+
return false;
}
@@ -680,14 +714,18 @@
* Cluster callback called when a node wants to know our complete list of local replicants
* @throws Exception Thrown if a cluster communication exception occurs
* @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
- */
+ */
public Object[] lookupLocalReplicants() throws Exception
{
- partitionNameKnown.acquire (); // we don't answer until our name is known
+ this.partitionNameKnown.await(); // we don't answer until our name is known
- Object[] rtn = {this.nodeName, localReplicants};
- if( trace )
- log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
+ Object[] rtn = { this.nodeName, this.localReplicants };
+
+ if (this.trace)
+ {
+ this.log.trace("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + this.localReplicants.size());
+ }
+
return rtn;
}
@@ -695,38 +733,38 @@
// Protected -----------------------------------------------------
- protected int calculateReplicantsHash (List members)
+ protected int calculateReplicantsHash(List<ClusterNode> members)
{
int result = 0;
- Object obj = null;
- for (int i=0; i<members.size (); i++)
+ for (ClusterNode member: members)
{
- obj = members.get (i);
- if (obj != null)
- result+= obj.hashCode (); // no explicit overflow with int addition
+ if (member != null)
+ {
+ result += member.getName().hashCode(); // no explicit overflow with int addition
+ }
}
return result;
}
- protected int updateReplicantsHashId (String key)
+ protected int updateReplicantsHashId(String key)
{
// we first get a list of all nodes names that replicate this key
//
- List nodes = this.lookupReplicantsNodeNames (key);
+ List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
int result = 0;
- if ( (nodes == null) || (nodes.size () == 0) )
+ if ((nodes == null) || nodes.isEmpty())
{
// no nore replicants for this key: we uncache our view id
//
- this.intraviewIdCache.remove (key);
+ this.intraviewIdCache.remove(key);
}
else
{
- result = this.calculateReplicantsHash (nodes);
- this.intraviewIdCache.put (key, new Integer (result));
+ result = this.calculateReplicantsHash(nodes);
+ this.intraviewIdCache.put(key, new Integer(result));
}
return result;
@@ -742,115 +780,57 @@
* @param key replicant key name
* @param nodeName name of the node that adds this replicant
* @param replicant Serialized representation of the replica
+ * @return true, if this replicant was newly added to the map, false otherwise
*/
- protected void addReplicant(String key, String nodeName, Serializable replicant)
+ protected boolean addReplicant(String key, String nodeName, Serializable replicant)
{
- addReplicant(replicants, key, nodeName, replicant);
+ ConcurrentMap<String, Serializable> map = new ConcurrentHashMap<String, Serializable>();
+
+ ConcurrentMap<String, Serializable> existingMap = this.replicants.putIfAbsent(key, map);
+
+ return (((existingMap != null) ? existingMap : map).put(nodeName, replicant) != null);
}
/**
- * Logic for adding replicant to any map.
- * @param map structure in which adding the new replicant
- * @param key name of the replicant key
- * @param nodeName name of the node adding the replicant
- * @param replicant serialized representation of the replicant that is added
- */
- protected void addReplicant(Map map, String key, String nodeName, Serializable replicant)
- {
- synchronized(map)
- {
- Map rep = (Map)map.get(key);
- if (rep == null)
- {
- if( trace )
- log.trace("_adding new HashMap");
- rep = new HashMap();
- map.put(key, rep);
- }
- rep.put(nodeName, replicant);
- }
- }
-
- protected Vector getKeysReplicatedByNode (String nodeName)
- {
- Vector result = new Vector ();
- synchronized (replicants)
- {
- Iterator keysIter = replicants.keySet ().iterator ();
- while (keysIter.hasNext ())
- {
- String key = (String)keysIter.next ();
- Map values = (Map)replicants.get (key);
- if ( (values != null) && values.containsKey (nodeName) )
- {
- result.add (key);
- }
- }
- }
- return result;
- }
-
- /**
- * Indicates if the a replicant already exists for a given key/node pair
- * @param key replicant key name
- * @param nodeName name of the node
- * @return a boolean indicating if a replicant for the given node exists for the given key
- */
- protected boolean replicantEntryAlreadyExists (String key, String nodeName)
- {
- return replicantEntryAlreadyExists (replicants, key, nodeName);
- }
-
- /**
- * Indicates if the a replicant already exists for a given key/node pair in the give data structure
- */
- protected boolean replicantEntryAlreadyExists (Map map, String key, String nodeName)
- {
- Map rep = (Map)map.get(key);
- if (rep == null)
- return false;
- else
- return rep.containsKey (nodeName);
- }
-
- /**
* Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
* @param key The replicant key name
* @param newReplicants The new list of replicants
* @param merge is the notification the result of a cluster merge?
*
- */
- protected void notifyKeyListeners(String key, List newReplicants, boolean merge)
+ */
+ protected void notifyKeyListeners(String key, List<Serializable> newReplicants, boolean merge)
{
- if( trace )
- log.trace("notifyKeyListeners");
+ if (this.trace)
+ {
+ this.log.trace("notifyKeyListeners");
+ }
// we first update the intra-view id for this particular key
//
- int newId = updateReplicantsHashId (key);
+ int newId = this.updateReplicantsHashId(key);
- ArrayList listeners = (ArrayList)keyListeners.get(key);
+ List<ReplicantListener> listeners = this.keyListeners.get(key);
+
if (listeners == null)
{
- if( trace )
- log.trace("listeners is null");
+ if (this.trace)
+ {
+ this.log.trace("listeners is null");
+ }
return;
}
- // ArrayList's iterator is not thread safe
- DistributedReplicantManager.ReplicantListener[] toNotify = null;
- synchronized(listeners)
+ if (this.trace)
{
- toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
- toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+ this.log.trace("notifying " + listeners.size() + " listeners for key change: " + key);
}
- if( trace )
- log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
- for (int i = 0; i < toNotify.length; i++)
+ for (ReplicantListener listener: listeners)
{
- if (toNotify[i] != null)
- toNotify[i].replicantsChanged(key, newReplicants, newId, merge);
+ if (listener != null)
+ {
+ listener.replicantsChanged(key, newReplicants, newId, merge);
+ }
}
}
@@ -858,38 +838,40 @@
{
try
{
- if( trace )
- log.trace("Start Re-Publish local replicants in DRM");
-
- Map localReplicants;
- synchronized (this.localReplicants)
+ if (this.trace)
{
- localReplicants = new HashMap(this.localReplicants);
+ this.log.trace("Start Re-Publish local replicants in DRM");
}
- Iterator entries = localReplicants.entrySet().iterator();
- while( entries.hasNext() )
+ for (Map.Entry<String, Serializable> entry: this.localReplicants.entrySet())
{
- Map.Entry entry = (Map.Entry) entries.next();
- String key = (String) entry.getKey();
- Object replicant = entry.getValue();
+ Serializable replicant = entry.getValue();
+
if (replicant != null)
{
- if( trace )
- log.trace("publishing, key=" + key + ", value=" + replicant);
+ String key = entry.getKey();
+
+ if (this.trace)
+ {
+ this.log.trace("publishing, key=" + key + ", value=" + replicant);
+ }
- Object[] args = {key, this.nodeName, replicant};
+ Object[] args = { key, this.nodeName, replicant };
- partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
- notifyKeyListeners(key, lookupReplicants(key), false);
+ this.partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+
+ this.notifyKeyListeners(key, this.getReplicants(key), false);
}
}
- if( trace )
- log.trace("End Re-Publish local replicants");
+
+ if (this.trace)
+ {
+ this.log.trace("End Re-Publish local replicants");
+ }
}
catch (Exception e)
{
- log.error("Re-Publish failed", e);
+ this.log.error("Re-Publish failed", e);
}
}
@@ -901,78 +883,91 @@
{
try
{
- log.debug("Start merging members in DRM service...");
- java.util.HashSet notifies = new java.util.HashSet ();
- ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
+ this.log.debug("Start merging members in DRM service...");
+
+ ArrayList<?> rsp = this.partition.callMethodOnCluster(SERVICE_NAME,
"lookupLocalReplicants",
new Object[]{}, new Class[]{}, true);
- if (rsp.size() == 0)
- log.debug("No responses from other nodes during the DRM merge process.");
- else
- {
- log.debug("The DRM merge process has received " + rsp.size() + " answers");
+ if (rsp.isEmpty())
+ {
+ this.log.debug("No responses from other nodes during the DRM merge process.");
}
- for (int i = 0; i < rsp.size(); i++)
+ else
{
- Object o = rsp.get(i);
- if (o == null)
+ this.log.debug("The DRM merge process has received " + rsp.size() + " answers");
+ }
+
+ // Record keys to be notified, and replicant list per key
+ Map<String, List<Serializable>> notifications = new HashMap<String, List<Serializable>>();
+
+ // Perform add/remove and replicant lookup atomically
+ synchronized (this.replicants)
+ {
+ for (Object o: rsp)
{
- log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
- continue;
- }
- else if (o instanceof Throwable)
- {
- log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
- continue;
- }
-
- Object[] objs = (Object[]) o;
- String node = (String)objs[0];
- Map replicants = (Map)objs[1];
- Iterator keys = replicants.keySet().iterator();
-
- //FIXME: We don't remove keys in the merge process but only add new keys!
- while (keys.hasNext())
- {
- String key = (String)keys.next();
- // done to reduce duplicate notifications
- if (!replicantEntryAlreadyExists (key, node))
+ if (o == null)
{
- addReplicant(key, node, (Serializable)replicants.get(key));
- notifies.add (key);
+ this.log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
+ continue;
}
- }
-
- Vector currentStatus = getKeysReplicatedByNode (node);
- if (currentStatus.size () > replicants.size ())
- {
- // The merge process needs to remove some (now)
- // unexisting keys
- //
- for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
+ else if (o instanceof Throwable)
{
- String theKey = (String)currentStatus.elementAt (currentKeysId);
- if (!replicants.containsKey (theKey))
+ this.log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+ continue;
+ }
+
+ Object[] objs = (Object[]) o;
+ String node = (String) objs[0];
+ Map<String, Serializable> replicants = (Map) objs[1];
+
+ //FIXME: We don't remove keys in the merge process but only add new keys!
+ for (Map.Entry<String, Serializable> entry: replicants.entrySet())
+ {
+ String key = entry.getKey();
+
+ if (this.addReplicant(key, node, entry.getValue()))
{
- removeReplicant (theKey, node);
- notifies.add(theKey);
+ notifications.put(key, null);
}
}
+
+ // The merge process needs to remove some (now) unexisting keys
+ for (Map.Entry<String, ConcurrentMap<String, Serializable>> entry: this.replicants.entrySet())
+ {
+ String key = entry.getKey();
+
+ if (entry.getValue().containsKey(node))
+ {
+ if (!replicants.containsKey(key))
+ {
+ if (this.removeReplicant(key, node))
+ {
+ notifications.put(key, null);
+ }
+ }
+ }
+ }
}
- }
+
+ // Lookup replicants for each changed key
+ for (Map.Entry<String, List<Serializable>> entry: notifications.entrySet())
+ {
+ entry.setValue(this.getReplicants(entry.getKey()));
+ }
+ }
- Iterator notifIter = notifies.iterator ();
- while (notifIter.hasNext ())
+ // Notify recorded key changes
+ for (Map.Entry<String, List<Serializable>> entry: notifications.entrySet())
{
- String key = (String)notifIter.next ();
- notifyKeyListeners(key, lookupReplicants(key), true);
+ this.notifyKeyListeners(entry.getKey(), entry.getValue(), true);
}
- log.debug ("..Finished merging members in DRM service");
- }
+ this.log.debug("..Finished merging members in DRM service");
+
+ }
catch (Exception ex)
{
- log.error("merge failed", ex);
+ this.log.error("merge failed", ex);
}
}
@@ -980,65 +975,51 @@
* Get rid of dead members from replicant list.
*
* @param deadMembers the members that are no longer in the view
- * @param merge whether the membership change occurred during
+ * @param merge whether the membership change occurred during
* a cluster merge
*/
- protected void purgeDeadMembers(Vector deadMembers, boolean merge)
+ protected void purgeDeadMembers(Vector<ClusterNode> deadMembers, boolean merge)
{
- if (deadMembers.size() <= 0)
- return;
+ if (deadMembers.isEmpty()) return;
- log.debug("purgeDeadMembers, "+deadMembers);
- try
+ this.log.debug("purgeDeadMembers, " + deadMembers);
+
+ List<String> deadNodes = new ArrayList<String>(deadMembers.size());
+
+ for (ClusterNode member: deadMembers)
{
- synchronized(replicants)
+ deadNodes.add(member.getName());
+ }
+
+ for (Map.Entry<String, ConcurrentMap<String, Serializable>> entry: this.replicants.entrySet())
+ {
+ String key = entry.getKey();
+ ConcurrentMap<String, Serializable> replicant = entry.getValue();
+
+ List<Serializable> replicants = null;
+
+ synchronized (this.replicants)
{
- Iterator keys = replicants.keySet().iterator();
- while (keys.hasNext())
+ if (replicant.keySet().removeAll(deadNodes))
{
- String key = (String)keys.next();
- Map replicant = (Map)replicants.get(key);
- boolean modified = false;
- for (int i = 0; i < deadMembers.size(); i++)
- {
- String node = deadMembers.elementAt(i).toString();
- log.debug("trying to remove deadMember " + node + " for key " + key);
- Object removed = replicant.remove(node);
- if (removed != null)
- {
- log.debug(node + " was removed");
- modified = true;
- }
- else
- {
- log.debug(node + " was NOT removed!!!");
- }
- }
- if (modified)
- {
- notifyKeyListeners(key, lookupReplicants(key), merge);
- }
+ replicants = this.getReplicants(key);
}
}
+
+ if (replicants != null)
+ {
+ this.notifyKeyListeners(key, replicants, merge);
+ }
}
- catch (Exception ex)
- {
- log.error("purgeDeadMembers failed", ex);
- }
}
/**
- */
+ */
protected void cleanupKeyListeners()
{
// NOT IMPLEMENTED YET
}
- protected synchronized static int nextThreadID()
- {
- return threadID ++;
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -1047,27 +1028,28 @@
{
public MergeMembers()
{
- super("DRM Async Merger#"+nextThreadID());
+ super("DRM Async Merger#" + threadID.getAndIncrement());
}
/**
* Called when the service needs to merge with another partition. This
* process is performed asynchronously
- */
+ */
public void run()
{
- log.debug("Sleeping for 50ms before mergeMembers");
+ DistributedReplicantManagerImpl.this.log.debug("Sleeping for 50ms before mergeMembers");
try
{
// if this thread invokes a cluster method call before
// membershipChanged event completes, it could timeout/hang
// we need to discuss this with Bela.
- Thread.sleep(50);
+ Thread.sleep(50);
}
- catch (Exception ignored)
+ catch (InterruptedException e)
{
+ Thread.currentThread().interrupt();
}
- mergeMembers();
+ DistributedReplicantManagerImpl.this.mergeMembers();
}
}
@@ -1075,7 +1057,7 @@
{
public MembersPublisher()
{
- super("DRM Async Publisher#"+nextThreadID());
+ super("DRM Async Publisher#" + threadID.getAndIncrement());
}
/**
@@ -1084,7 +1066,7 @@
*/
public void run()
{
- log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
+ DistributedReplicantManagerImpl.this.log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
try
{
// if this thread invokes a cluster method call before
@@ -1092,10 +1074,11 @@
// we need to discuss this with Bela.
Thread.sleep(50);
}
- catch (Exception ignored)
+ catch (InterruptedException e)
{
+ Thread.currentThread().interrupt();
}
- republishLocalReplicants();
+ DistributedReplicantManagerImpl.this.republishLocalReplicants();
}
}
}
More information about the jboss-cvs-commits
mailing list