[jboss-cvs] JBossAS SVN: r74643 - trunk/cluster/src/main/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 16 17:07:51 EDT 2008


Author: pferraro
Date: 2008-06-16 17:07:51 -0400 (Mon, 16 Jun 2008)
New Revision: 74643

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
   trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
Log:
[JBAS-5434] Ensure DistributedReplicantManager can handle concurrent JGroups requests.
Included moderate code cleanup.
Replaced use of classes from Doug Lea's concurrent package with equivalents from java.util.concurrent

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-06-16 20:53:23 UTC (rev 74642)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-06-16 21:07:51 UTC (rev 74643)
@@ -58,6 +58,7 @@
 import org.jboss.system.ServiceMBeanSupport;
 import org.jboss.system.server.ServerConfigUtil;
 import org.jboss.util.threadpool.ThreadPool;
+import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;
 import org.jgroups.ExtendedMembershipListener;
@@ -76,8 +77,8 @@
 import org.jgroups.util.RspList;
 
 /**
- * {@link HAPartition} implementation based on a 
- * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code> 
+ * {@link HAPartition} implementation based on a
+ * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
  * and a multiplexed <code>JChannel</code>.
  *
  * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
@@ -88,7 +89,7 @@
  */
 public class ClusterPartition
    extends ServiceMBeanSupport
-   implements ExtendedMembershipListener, HAPartition, 
+   implements ExtendedMembershipListener, HAPartition,
               AsynchEventHandler.AsynchEventProcessor,
               ClusterPartitionMBean
 {
@@ -111,7 +112,7 @@
    private static class StateStreamEnd implements Serializable
    {
       /** The serialVersionUID */
-      private static final long serialVersionUID = -3705345735451504946L;      
+      private static final long serialVersionUID = -3705345735451504946L;
    }
    
    /**
@@ -153,7 +154,7 @@
       {
          try
          {
-            ClusterPartition.this.channel.connect(getPartitionName());
+            ClusterPartition.this.channel.connect(ClusterPartition.this.getPartitionName());
          }
          catch (Exception e)
          {
@@ -194,24 +195,24 @@
    /** Do we send any membership change notifications synchronously? */
    protected boolean allowSyncListeners = false;
    /** The HAMembershipListener and HAMembershipExtendedListeners */
-   protected ArrayList synchListeners = new ArrayList();
+   protected ArrayList<HAMembershipListener> synchListeners = new ArrayList<HAMembershipListener>();
    /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
-   protected ArrayList asynchListeners = new ArrayList();
+   protected ArrayList<HAMembershipListener> asynchListeners = new ArrayList<HAMembershipListener>();
    /** The handler used to send membership change notifications asynchronously */
    protected AsynchEventHandler asynchHandler;
    /** The current cluster partition members */
-   protected Vector members = null;
-   protected Vector jgmembers = null;
+   protected Vector<ClusterNode> members = null;
+   protected Vector<Address> jgmembers = null;
    protected Map<String, WeakReference<ClassLoader>> clmap =
                                           new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
 
-   public Vector history = new Vector();
+   public Vector<String> history = new Vector<String>();
 
    /** The partition members other than this node */
-   protected Vector otherMembers = null;
-   protected Vector jgotherMembers = null;
+   protected Vector<ClusterNode> otherMembers = null;
+   protected Vector<Address> jgotherMembers = null;
    /** the local JG IP Address */
-   protected org.jgroups.stack.IpAddress localJGAddress = null;
+   protected Address localJGAddress = null;
    /** The cluster transport protocol address string */
    protected String nodeName;
    /** me as a ClusterNode */
@@ -224,7 +225,7 @@
    protected DistributedStateImpl distributedState;
    /** The cluster instance log category */
    protected Logger log;
-   protected Logger clusterLifeCycleLog;   
+   protected Logger clusterLifeCycleLog;
    /** The current cluster view id */
    protected long currentViewId = -1;
    /** Whether to bind the partition into JNDI */
@@ -254,15 +255,19 @@
    
    private Channel createChannel()
    {
-      ChannelFactory factory = getChannelFactory();
+      ChannelFactory factory = this.getChannelFactory();
       if (factory == null)
+      {
          throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
-      String stack = getChannelStackName();
+      }
+      String stack = this.getChannelStackName();
       if (stack == null)
+      {
          throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
+      }
       try
       {
-         return factory.createMultiplexerChannel(stack, getPartitionName());
+         return factory.createMultiplexerChannel(stack, this.getPartitionName());
       }
       catch (RuntimeException e)
       {
@@ -278,7 +283,7 @@
    
    public ClusterPartition()
    {
-      logHistory ("Partition object created");      
+      this.logHistory("Partition object created");
    }
 
    // ------------------------------------------------------------ ServiceMBean
@@ -288,16 +293,18 @@
    protected void createService() throws Exception
    {
       if (this.replicantManager == null)
+      {
          throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
+      }
 
-      setupLoggers(getPartitionName());
+      this.setupLoggers(this.getPartitionName());
       
       this.replicantManager.createService();
       
       if (this.distributedState != null)
       {
          this.distributedState.createService();
-      }         
+      }
       
       // Create the asynchronous handler for view changes
       this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
@@ -307,7 +314,7 @@
    
    protected void startService() throws Exception
    {
-      logHistory ("Starting partition");
+      this.logHistory ("Starting partition");
       
       this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
       this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
@@ -315,19 +322,19 @@
       
       if (this.channel == null || !this.channel.isOpen())
       {
-         this.log.debug("Creating Channel for partition " + getPartitionName() +
-               " using stack " + getChannelStackName());
+         this.log.debug("Creating Channel for partition " + this.getPartitionName() +
+               " using stack " + this.getChannelStackName());
    
-         this.channel = createChannel();
+         this.channel = this.createChannel();
          
          this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
          this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
       }
       
-      this.log.info("Initializing partition " + getPartitionName());
-      logHistory ("Initializing partition " + getPartitionName());
+      this.log.info("Initializing partition " + this.getPartitionName());
+      this.logHistory ("Initializing partition " + this.getPartitionName());
       
-      this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), getDeadlockDetection());
+      this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), this.getDeadlockDetection());
       
       // Subscribe to events generated by the channel
       this.log.debug("setMembershipListener");
@@ -343,7 +350,7 @@
       
       if (this.threadPool == null)
       {
-         this.channel.connect(getPartitionName());
+         this.channel.connect(this.getPartitionName());
          connectLatch.countDown();
       }
       else
@@ -357,31 +364,33 @@
       this.cache.start();
       
       try
-      {         
+      {
          // This will block waiting for any async channel connect above
          connectLatch.await();
          
          if (this.connectException != null)
+         {
             throw this.connectException;
+         }
          
          this.log.debug("Get current members");
-         waitForView();     
+         this.waitForView();
          
-         // get current JG group properties         
+         // get current JG group properties
          this.log.debug("get nodeName");
-         this.localJGAddress = (IpAddress)this.channel.getLocalAddress();
-         this.me = new ClusterNodeImpl(this.localJGAddress);
+         this.localJGAddress = this.channel.getLocalAddress();
+         this.me = new ClusterNodeImpl((IpAddress) this.localJGAddress);
          this.nodeName = this.me.getName();
 
-         verifyNodeIsUnique();
+         this.verifyNodeIsUnique();
 
-         fetchState();
+         this.fetchState();
          
          this.replicantManager.startService();
          
          if (this.distributedState != null)
          {
-            this.distributedState.setClusteredCache(getClusteredCache());
+            this.distributedState.setClusteredCache(this.getClusteredCache());
             this.distributedState.startService();
          }
          
@@ -395,9 +404,9 @@
          if (this.bindIntoJndi)
          {
             Context ctx = new InitialContext();
-            bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()), 
+            this.bind(HAPartitionLocator.getStandardJndiBinding(this.getPartitionName()),
                       this, ClusterPartition.class, ctx);
-            this.log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+            this.log.debug("Bound in JNDI under /HAPartition/" + this.getPartitionName());
          }
       }
       catch (Throwable t)
@@ -412,8 +421,8 @@
 
    protected void stopService() throws Exception
    {
-      logHistory ("Stopping partition");
-      this.log.info("Stopping partition " + getPartitionName());
+      this.logHistory ("Stopping partition");
+      this.log.info("Stopping partition " + this.getPartitionName());
 
       try
       {
@@ -422,7 +431,7 @@
       catch( Exception e)
       {
          this.log.warn("Failed to stop asynchHandler", e);
-      }    
+      }
       
       if (this.distributedState != null)
       {
@@ -445,7 +454,9 @@
       try
       {
          if (this.channel != null && this.channel.isConnected())
+         {
             this.channel.disconnect();
+         }
       }
       catch (Exception e)
       {
@@ -454,7 +465,7 @@
 
       if (this.bindIntoJndi)
       {
-         String boundName = HAPartitionLocator.getStandardJndiBinding(getPartitionName());
+         String boundName = HAPartitionLocator.getStandardJndiBinding(this.getPartitionName());
          InitialContext ctx = null;
          try
          {
@@ -468,51 +479,55 @@
          finally
          {
             if (ctx != null)
+            {
                ctx.close();
+            }
          }
-         NonSerializableFactory.unbind (boundName);         
+         NonSerializableFactory.unbind(boundName);
       }
       
       HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
 
-      this.log.info("Partition " + getPartitionName() + " stopped.");
+      this.log.info("Partition " + this.getPartitionName() + " stopped.");
    }
    
    protected void destroyService()  throws Exception
    {
-      this.log.debug("Destroying HAPartition: " + getPartitionName()); 
+      this.log.debug("Destroying HAPartition: " + this.getPartitionName());
       
       if (this.distributedState != null)
       {
          this.distributedState.destroyService();
-      }   
+      }
 
       this.replicantManager.destroyService();
 
       try
       {
          if (this.channel != null && this.channel.isOpen())
+         {
             this.channel.close();
+         }
       }
       catch (Exception e)
       {
          this.log.error("Closing channel failed", e);
       }
 
-      this.log.info("Partition " + getPartitionName() + " destroyed.");
+      this.log.info("Partition " + this.getPartitionName() + " destroyed.");
    }
    
-   // ---------------------------------------------------------- State Transfer 
+   // ---------------------------------------------------------- State Transfer
 
 
    protected void fetchState() throws Exception
    {
-      this.log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() + 
+      this.log.info("Fetching serviceState (will wait for " + this.getStateTransferTimeout() +
             " milliseconds):");
       long start, stop;
       this.isStateSet = false;
       start = System.currentTimeMillis();
-      boolean rc = this.channel.getState(null, getStateTransferTimeout());
+      boolean rc = this.channel.getState(null, this.getStateTransferTimeout());
       if (rc)
       {
          synchronized (this.channelLock)
@@ -520,7 +535,9 @@
             while (!this.isStateSet)
             {
                if (this.setStateException != null)
+               {
                   throw this.setStateException;
+               }
 
                try
                {
@@ -555,7 +572,7 @@
             }
          }
 
-         if (isCurrentNodeCoordinator())
+         if (this.isCurrentNodeCoordinator())
          {
             this.log.info("State could not be retrieved (we are the first member in group)");
          }
@@ -628,9 +645,11 @@
       
       while (true)
       {
-         Object obj = mvis.readObject(); 
+         Object obj = mvis.readObject();
          if (obj instanceof StateStreamEnd)
+         {
             break;
+         }
          
          String key = (String) obj;
          this.log.debug("setState for " + key);
@@ -651,20 +670,20 @@
                if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
                {
                   if (e instanceof RuntimeException)
+                  {
                      throw (RuntimeException) e;
-                  else
-                     throw new RuntimeException(e);
+                  }
+
+                  throw new RuntimeException(e);
                }
-               else
-               {
-                  this.log.error("Caught exception setting serviceState to " + subscriber, e);
-               }
+
+               this.log.error("Caught exception setting serviceState to " + subscriber, e);
             }
          }
          else
          {
             this.log.debug("There is no stateHandler for: " + key);
-         }      
+         }
       }
       
       try
@@ -686,15 +705,19 @@
    {
       this.log.error("failed setting serviceState", t);
       if (t instanceof Exception)
+      {
          this.setStateException = (Exception) t;
+      }
       else
+      {
          this.setStateException = new Exception(t);
+      }
    }
 
    private void notifyChannelLock()
    {
       synchronized (this.channelLock)
-      {         
+      {
          this.channelLock.notifyAll();
       }
    }
@@ -702,23 +725,27 @@
    // org.jgroups.MembershipListener implementation ----------------------------------------------
    
    public void suspect(org.jgroups.Address suspected_mbr)
-   {      
-      logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
-      if (isCurrentNodeCoordinator ())
+   {
+      this.logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
+      if (this.isCurrentNodeCoordinator ())
+      {
          this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+      }
       else
+      {
          this.log.info("Suspected member: " + suspected_mbr);
+      }
    }
 
-   public void block() 
-   {       
-       this.flushBlockGate.close();           
-       this.log.debug("Block processed at " + this.me);	  
+   public void block()
+   {
+       this.flushBlockGate.close();
+       this.log.debug("Block processed at " + this.me);
    }
    
    public void unblock()
    {
-       this.flushBlockGate.open();           
+       this.flushBlockGate.open();
        this.log.debug("Unblock processed at " + this.me);
    }
    
@@ -741,24 +768,24 @@
          // Keep a list of other members only for "exclude-self" RPC calls
          this.jgotherMembers = (Vector)newView.getMembers().clone();
          this.jgotherMembers.remove (this.channel.getLocalAddress());
-         this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
-         Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
-         logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
+         this.otherMembers = this.translateAddresses (this.jgotherMembers); // TRANSLATE!
+         Vector<ClusterNode> translatedNewView = this.translateAddresses ((Vector)newView.getMembers().clone());
+         this.logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
                      " (old view: " + this.members + " )");
 
 
          // Save the previous view and make a copy of the new view
-         Vector oldMembers = this.members;
+         Vector<ClusterNode> oldMembers = this.members;
 
-         Vector newjgMembers = (Vector)newView.getMembers().clone();
-         Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
+         Vector<Address> newjgMembers = (Vector)newView.getMembers().clone();
+         Vector<ClusterNode> newMembers = this.translateAddresses(newjgMembers); // TRANSLATE
          this.members = newMembers;
          this.jgmembers = newjgMembers;
          
          if (oldMembers == null)
          {
             // Initial viewAccepted
-            this.log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
+            this.log.debug("ViewAccepted: initial members set for partition " + this.getPartitionName() + ": " +
                      this.currentViewId + " (" + this.members + ")");
             
             this.log.info("Number of cluster members: " + this.members.size());
@@ -770,29 +797,29 @@
             this.log.info ("Other members: " + this.otherMembers.size ());
             
             // Wake up the deployer thread blocking in waitForView
-            notifyChannelLock();
+            this.notifyChannelLock();
             return;
-         }        
+         }
          
-         int difference = 0;
-         if (oldMembers == null)
-            difference = newMembers.size () - 1;
-         else
-            difference = newMembers.size () - oldMembers.size ();
+         int difference = newMembers.size() - oldMembers.size();
          
-         if (isCurrentNodeCoordinator ())
-            this.clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+         if (this.isCurrentNodeCoordinator ())
+         {
+            this.clusterLifeCycleLog.info ("New cluster view for partition " + this.getPartitionName() + " (id: " +
                                       this.currentViewId + ", delta: " + difference + ") : " + this.members);
+         }
          else
-            this.log.info("New cluster view for partition " + getPartitionName() + ": " +
+         {
+            this.log.info("New cluster view for partition " + this.getPartitionName() + ": " +
                      this.currentViewId + " (" + this.members + " delta: " + difference + ")");
+         }
 
          // Build a ViewChangeEvent for the asynch listeners
          ViewChangeEvent event = new ViewChangeEvent();
          event.viewId = this.currentViewId;
          event.allMembers = translatedNewView;
-         event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
-         event.newMembers = getNewMembers(oldMembers, event.allMembers);
+         event.deadMembers = this.getDeadMembers(oldMembers, event.allMembers);
+         event.newMembers = this.getNewMembers(oldMembers, event.allMembers);
          event.originatingGroups = null;
          // if the new view occurs because of a merge, we first inform listeners of the merge
          if(newView instanceof MergeView)
@@ -801,16 +828,14 @@
             event.originatingGroups = mergeView.getSubgroups();
          }
 
-         this.log.debug("membership changed from " + 
-                  (oldMembers == null ? 0 : oldMembers.size()) + " to " + 
-                  event.allMembers.size());
+         this.log.debug("membership changed from " + oldMembers.size() + " to " + event.allMembers.size());
          // Put the view change to the asynch queue
          this.asynchHandler.queueEvent(event);
 
          // Broadcast the new view to the synchronous view change listeners
          if (this.allowSyncListeners)
          {
-            notifyListeners(this.synchListeners, event.viewId, event.allMembers,
+            this.notifyListeners(this.synchListeners, event.viewId, event.allMembers,
                   event.deadMembers, event.newMembers, event.originatingGroups);
          }
       }
@@ -827,21 +852,27 @@
          if (this.members == null)
          {
             if (this.connectException != null)
+            {
                throw this.connectException;
+            }
             
             try
             {
-               this.channelLock.wait(getMethodCallTimeout());
+               this.channelLock.wait(this.getMethodCallTimeout());
             }
             catch (InterruptedException iex)
             {
             }
             
             if (this.connectException != null)
+            {
                throw this.connectException;
+            }
             
             if (this.members == null)
+            {
                throw new IllegalStateException("No view received from Channel");
+            }
          }
       }
    }
@@ -856,7 +887,7 @@
    public String getPartitionName()
    {
       return this.partitionName;
-   }  
+   }
 
    public void setPartitionName(String newName)
    {
@@ -878,12 +909,12 @@
       return this.currentViewId;
    }
    
-   public Vector getCurrentView()
+   public Vector<String> getCurrentView()
    {
-      Vector result = new Vector (this.members.size());
-      for (int i = 0; i < this.members.size(); i++)
+      Vector<String> result = new Vector<String>(this.members.size());
+      for (ClusterNode member: this.members)
       {
-         result.add( ((ClusterNode) this.members.elementAt(i)).getName() );
+         result.add(member.getName());
       }
       return result;
    }
@@ -892,9 +923,7 @@
    {
       synchronized (this.members)
       {
-         ClusterNode[] nodes = new ClusterNode[this.members.size()];
-         nodes = (ClusterNode[]) this.members.toArray(nodes);
-         return nodes;
+         return this.members.toArray(new ClusterNode[this.members.size()]);
       }
    }
 
@@ -906,7 +935,9 @@
    public boolean isCurrentNodeCoordinator ()
    {
       if(this.members == null || this.members.size() == 0 || this.me == null)
+      {
          return false;
+      }
      return this.members.elementAt (0).equals (this.me);
    }
 
@@ -923,7 +954,7 @@
    
    public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
    {
-      registerRPCHandler(objName, subscriber);
+      this.registerRPCHandler(objName, subscriber);
       this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
    }
    
@@ -931,7 +962,7 @@
    {
       this.rpcHandlers.remove(objName);
       this.clmap.remove(objName);
-   }      
+   }
 
    /**
     * This function is an abstraction of RpcDispatcher.
@@ -939,7 +970,7 @@
    public ArrayList callMethodOnCluster(String objName, String methodName,
       Object[] args, Class[] types, boolean excludeSelf) throws Exception
    {
-      return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout());
+      return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.getMethodCallTimeout());
    }
 
 
@@ -953,7 +984,7 @@
       
       if(this.channel.flushSupported())
       {
-     	 this.flushBlockGate.await(getStateTransferTimeout());
+     	 this.flushBlockGate.await(this.getStateTransferTimeout());
       }
       if (excludeSelf)
       {
@@ -974,7 +1005,7 @@
          rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
       }
 
-      return processResponseList(rsp, trace);
+      return this.processResponseList(rsp, trace);
     }
 
    /**
@@ -992,7 +1023,7 @@
    public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
           Object[] args, Class[] types,boolean excludeSelf) throws Exception
    {
-      return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout());
+      return this.callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, this.getMethodCallTimeout());
    }
 
    /**
@@ -1022,17 +1053,17 @@
       }
 
       // the first cluster view member is the coordinator
-      Vector coordinatorOnly = new Vector();
+      Vector<Address> coordinatorOnly = new Vector<Address>();
       // If we are the coordinator, only call ourself if 'excludeSelf' is false
-      if (false == isCurrentNodeCoordinator () ||
+      if (false == this.isCurrentNodeCoordinator () ||
           false == excludeSelf)
       {
-         coordinatorOnly.addElement(this.jgmembers.elementAt (0));
+         coordinatorOnly.addElement(this.jgmembers.elementAt(0));
       }
       
       RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
 
-      return processResponseList(rsp, trace);
+      return this.processResponseList(rsp, trace);
    }
 
     /**
@@ -1051,8 +1082,10 @@
            Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
     {
        if (!(targetNode instanceof ClusterNodeImpl))
-          throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " + 
+      {
+         throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
                                           ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
+      }
        boolean trace = this.log.isTraceEnabled();
        
        MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
@@ -1075,17 +1108,25 @@
              {
                 item = response.getValue();
                 if (!(item instanceof NoHandlerForRPC))
-                   rc = item;
+               {
+                  rc = item;
+               }
                 }
                 else if( trace )
-                   this.log.trace("Ignoring non-received response: "+response);
+               {
+                  this.log.trace("Ignoring non-received response: "+response);
+               }
              }
              else
              {
                 if (!(item instanceof NoHandlerForRPC))
-                   rc = item;
-                else if( trace )
-                   this.log.trace("Ignoring NoHandlerForRPC");
+               {
+                  rc = item;
+               }
+               else if( trace )
+               {
+                  this.log.trace("Ignoring NoHandlerForRPC");
+               }
              }
           }
        return rc;
@@ -1108,8 +1149,10 @@
            Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
    {
       if (!(targetNode instanceof ClusterNodeImpl))
-         throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " + 
+      {
+         throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
                                          ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
+      }
        boolean trace = this.log.isTraceEnabled();
 
        MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
@@ -1138,17 +1181,25 @@
                {
                   item = response.getValue();
                   if (!(item instanceof NoHandlerForRPC))
+                  {
                      rtn.add(item);
+                  }
                }
                else if( trace )
+               {
                   this.log.trace("Ignoring non-received response: "+response);
+               }
             }
             else
             {
                if (!(item instanceof NoHandlerForRPC))
+               {
                   rtn.add(item);
+               }
                else if( trace )
+               {
                   this.log.trace("Ignoring NoHandlerForRPC");
+               }
             }
          }
          
@@ -1168,7 +1219,7 @@
 
       if(this.channel.flushSupported())
       {
-     	 this.flushBlockGate.await(getStateTransferTimeout());
+     	 this.flushBlockGate.await(this.getStateTransferTimeout());
       }
       if (excludeSelf)
       {
@@ -1177,7 +1228,7 @@
             this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
                +", methodName="+methodName+", members="+this.jgotherMembers);
          }
-         this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+         this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
       }
       else
       {
@@ -1186,7 +1237,7 @@
             this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
                +", methodName="+methodName+", members="+this.members);
          }
-         this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+         this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
       }
    }
    
@@ -1195,7 +1246,7 @@
    // State transfer management
    // *************************
    // *************************
-   //      
+   //
    public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
    {
       this.stateHandlers.put(objectName, subscriber);
@@ -1211,10 +1262,10 @@
    // Group Membership listeners
    // *************************
    // *************************
-   //   
+   //
    public void registerMembershipListener(HAMembershipListener listener)
    {
-      boolean isAsynch = (this.allowSyncListeners == false) 
+      boolean isAsynch = (this.allowSyncListeners == false)
             || (listener instanceof AsynchHAMembershipListener)
             || (listener instanceof AsynchHAMembershipExtendedListener);
       if( isAsynch ) {
@@ -1222,7 +1273,7 @@
             this.asynchListeners.add(listener);
          }
       }
-      else  { 
+      else  {
          synchronized(this.synchListeners) {
             this.synchListeners.add(listener);
          }
@@ -1231,7 +1282,7 @@
    
    public void unregisterMembershipListener(HAMembershipListener listener)
    {
-      boolean isAsynch = (this.allowSyncListeners == false) 
+      boolean isAsynch = (this.allowSyncListeners == false)
             || (listener instanceof AsynchHAMembershipListener)
             || (listener instanceof AsynchHAMembershipExtendedListener);
       if( isAsynch ) {
@@ -1239,7 +1290,7 @@
             this.asynchListeners.remove(listener);
          }
       }
-      else  { 
+      else  {
          synchronized(this.synchListeners) {
             this.synchListeners.remove(listener);
          }
@@ -1252,7 +1303,7 @@
    }
 
    public void setAllowSynchronousMembershipNotifications(boolean allowSync)
-   {      
+   {
       this.allowSyncListeners = allowSync;
    }
    
@@ -1261,7 +1312,7 @@
    public void processEvent(Object event)
    {
       ViewChangeEvent vce = (ViewChangeEvent) event;
-      notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
+      this.notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
             vce.deadMembers, vce.newMembers, vce.originatingGroups);
       
    }
@@ -1271,13 +1322,15 @@
    
    public void setDistributedStateImpl(DistributedStateImpl distributedState)
    {
-      this.distributedState = distributedState;      
+      this.distributedState = distributedState;
    }
    
    public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
    {
       if (this.replicantManager != null  && !(this.replicantManager == drm))
+      {
          throw new IllegalStateException("DistributedReplicantManager already set");
+      }
 
       this.replicantManager = drm;
       if (this.replicantManager != null)
@@ -1292,7 +1345,7 @@
    protected void verifyNodeIsUnique () throws IllegalStateException
    {
       ClusterNodeImpl matched = null;
-      for (ClusterNode member : getClusterNodes())
+      for (ClusterNode member : this.getClusterNodes())
       {
          if (member.equals(this.me))
          {
@@ -1310,7 +1363,7 @@
                {
                   other = (ClusterNodeImpl) member;
                }
-               throw new IllegalStateException("Found member " + other + 
+               throw new IllegalStateException("Found member " + other +
                      " in current view that duplicates us (" + this.me + "). This" +
                      " node cannot join partition until duplicate member has" +
                      " been removed");
@@ -1326,7 +1379,7 @@
     * @param classType Class type under which should appear the bound object
     * @param ctx Naming context under which we bind the object
     * @throws Exception Thrown if a naming exception occurs during binding
-    */   
+    */
    protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
    {
       // Ah ! This service isn't serializable, so we use a helper class
@@ -1362,12 +1415,18 @@
     * @param oldMembers Vector of old members
     * @param newMembers Vector of new members
     * @return Vector of members that have died between the two views, can be empty.
-    */   
-   protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
+    */
+   protected Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
    {
-      if(oldMembers == null) oldMembers=new Vector();
-      if(newMembers == null) newMembers=new Vector();
-      Vector dead=(Vector)oldMembers.clone();
+      if(oldMembers == null)
+      {
+         oldMembers=new Vector<ClusterNode>();
+      }
+      if(newMembers == null)
+      {
+         newMembers=new Vector<ClusterNode>();
+      }
+      Vector<ClusterNode> dead=(Vector)oldMembers.clone();
       dead.removeAll(newMembers);
       this.log.debug("dead members: " + dead);
       return dead;
@@ -1378,19 +1437,25 @@
     * @param oldMembers Vector of old members
     * @param allMembers Vector of new members
     * @return Vector of members that have joined the partition between the two views
-    */   
-   protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
+    */
+   protected Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
    {
-      if(oldMembers == null) oldMembers=new Vector();
-      if(allMembers == null) allMembers=new Vector();
-      Vector newMembers=(Vector)allMembers.clone();
+      if(oldMembers == null)
+      {
+         oldMembers=new Vector<ClusterNode>();
+      }
+      if(allMembers == null)
+      {
+         allMembers=new Vector<ClusterNode>();
+      }
+      Vector<ClusterNode> newMembers=(Vector)allMembers.clone();
       newMembers.removeAll(oldMembers);
       return newMembers;
    }
 
-   protected void notifyListeners(ArrayList theListeners, long viewID,
-      Vector allMembers, Vector deadMembers, Vector newMembers,
-      Vector originatingGroups)
+   protected void notifyListeners(ArrayList<HAMembershipListener> theListeners, long viewID,
+      Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
+      Vector<View> originatingGroups)
    {
       this.log.debug("Begin notifyListeners, viewID: "+viewID);
       synchronized(theListeners)
@@ -1404,7 +1469,7 @@
          HAMembershipListener aListener = null;
          try
          {
-            aListener = (HAMembershipListener) theListeners.get(i);
+            aListener = theListeners.get(i);
             if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
             {
                HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
@@ -1446,10 +1511,7 @@
    {
        return this.bindIntoJndi;
    }
-   
-   
 
-
    public ThreadPool getThreadPool()
    {
       return this.threadPool;
@@ -1460,16 +1522,17 @@
       this.threadPool = threadPool;
    }
 
-   protected Vector translateAddresses (Vector jgAddresses)
+   protected Vector<ClusterNode> translateAddresses(Vector<Address> addresses)
    {
-      if (jgAddresses == null)
+      if (addresses == null)
+      {
          return null;
+      }
 
-      Vector result = new Vector (jgAddresses.size());
-      for (int i = 0; i < jgAddresses.size(); i++)
+      Vector<ClusterNode> result = new Vector<ClusterNode>(addresses.size());
+      for (Address address: addresses)
       {
-         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
-         result.add(new ClusterNodeImpl(addr));
+         result.add(new ClusterNodeImpl((IpAddress) address));
       }
 
       return result;
@@ -1489,10 +1552,10 @@
    public String showHistory ()
    {
       StringBuffer buff = new StringBuffer();
-      Vector data = new Vector (this.history);
-      for (java.util.Iterator row = data.iterator(); row.hasNext();)
+      Vector<String> data = new Vector<String>(this.history);
+      for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
       {
-         String info = (String) row.next();
+         String info = row.next();
          buff.append(info).append("\n");
       }
       return buff.toString();
@@ -1502,11 +1565,11 @@
    {
       StringBuffer buff = new StringBuffer();
       buff.append("<events>\n");
-      Vector data = new Vector (this.history);
-      for (java.util.Iterator row = data.iterator(); row.hasNext();)
+      Vector<String> data = new Vector<String>(this.history);
+      for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
       {
          buff.append("   <event>\n      ");
-         String info = (String) row.next();
+         String info = row.next();
          buff.append(info);
          buff.append("\n   </event>\n");
       }
@@ -1594,7 +1657,7 @@
 
    public void setMethodCallTimeout(long timeout)
    {
-      this.method_call_timeout = timeout;      
+      this.method_call_timeout = timeout;
    }
 
    // Protected --------------------------------------------------------------
@@ -1604,8 +1667,10 @@
     */
    protected Object objectFromByteBufferInternal (byte[] buffer) throws Exception
    {
-      if(buffer == null) 
+      if(buffer == null)
+      {
          return null;
+      }
 
       ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
       MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
@@ -1630,11 +1695,15 @@
     */
    protected Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
    {
-      if(buffer == null) 
+      if(buffer == null)
+      {
          return null;
+      }
 
       if (buffer[0] == NULL_VALUE)
+      {
          return null;
+      }
 
       ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
       // read past the null/serializable byte
@@ -1650,7 +1719,9 @@
    protected byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
    {
       if (obj == null)
+      {
          return new byte[]{NULL_VALUE};
+      }
 
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       // write a marker to stream to distinguish from null value stream
@@ -1671,12 +1742,12 @@
       
       public void getState(OutputStream stream)
       {
-         logHistory ("getState called on partition");
+         ClusterPartition.this.logHistory ("getState called on partition");
          
          ClusterPartition.this.log.debug("getState called.");
          try
          {
-            getStateInternal(stream);
+            ClusterPartition.this.getStateInternal(stream);
          }
          catch (Exception ex)
          {
@@ -1697,7 +1768,7 @@
       
       public void setState(InputStream stream)
       {
-         logHistory ("setState called on partition");
+         ClusterPartition.this.logHistory ("setState called on partition");
          try
          {
             if (stream == null)
@@ -1706,31 +1777,31 @@
             }
             else
             {
-               setStateInternal(stream);
+               ClusterPartition.this.setStateInternal(stream);
             }
             
             ClusterPartition.this.isStateSet = true;
          }
          catch (Throwable t)
          {
-            recordSetStateFailure(t);
+            ClusterPartition.this.recordSetStateFailure(t);
          }
          finally
          {
             // Notify waiting thread that serviceState has been set.
-            notifyChannelLock();
+            ClusterPartition.this.notifyChannelLock();
          }
       }
 
       public byte[] getState()
       {
-         logHistory ("getState called on partition");
+         ClusterPartition.this.logHistory ("getState called on partition");
          
          ClusterPartition.this.log.debug("getState called.");
          try
          {
             ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-            getStateInternal(baos);
+            ClusterPartition.this.getStateInternal(baos);
             return baos.toByteArray();
          }
          catch (Exception ex)
@@ -1755,7 +1826,7 @@
       
       public void setState(byte[] obj)
       {
-         logHistory ("setState called on partition");
+         ClusterPartition.this.logHistory ("setState called on partition");
          try
          {
             if (obj == null)
@@ -1765,7 +1836,7 @@
             else
             {
                ByteArrayInputStream bais = new ByteArrayInputStream(obj);
-               setStateInternal(bais);
+               ClusterPartition.this.setStateInternal(bais);
                bais.close();
             }
             
@@ -1773,28 +1844,28 @@
          }
          catch (Throwable t)
          {
-            recordSetStateFailure(t);
+            ClusterPartition.this.recordSetStateFailure(t);
          }
          finally
          {
             // Notify waiting thread that serviceState has been set.
-            notifyChannelLock();
+            ClusterPartition.this.notifyChannelLock();
          }
       }
       
    }
 
-   /** 
+   /**
     * A simple data class containing the view change event needed to
     * notify the HAMembershipListeners
     */
    private static class ViewChangeEvent
    {
       long viewId;
-      Vector deadMembers;
-      Vector newMembers;
-      Vector allMembers;
-      Vector originatingGroups;
+      Vector<ClusterNode> deadMembers;
+      Vector<ClusterNode> newMembers;
+      Vector<ClusterNode> allMembers;
+      Vector<View> originatingGroups;
    }
    
    private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
@@ -1802,7 +1873,7 @@
 
       public Object objectFromByteBuffer(byte[] buf) throws Exception
       {
-         return objectFromByteBufferInternal(buf);
+         return ClusterPartition.this.objectFromByteBufferInternal(buf);
       }
 
       public byte[] objectToByteBuffer(Object obj) throws Exception
@@ -1813,11 +1884,11 @@
             String name = ((MethodCall)obj).getName();
             int idx = name.lastIndexOf('.');
             String serviceName = name.substring(0, idx);
-            return objectToByteBufferInternal(new Object[]{serviceName, objectToByteBufferInternal(obj)});           
+            return ClusterPartition.this.objectToByteBufferInternal(new Object[]{serviceName, ClusterPartition.this.objectToByteBufferInternal(obj)});
          }
-         else // this shouldn't occur
-            return objectToByteBufferInternal(obj);
-      }      
+
+         return ClusterPartition.this.objectToByteBufferInternal(obj);
+      }
    }
    
    private class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
@@ -1826,7 +1897,7 @@
       public Object objectFromByteBuffer(byte[] buf) throws Exception
       {
          boolean trace = ClusterPartition.this.log.isTraceEnabled();
-         Object retval = objectFromByteBufferResponseInternal(buf);
+         Object retval = ClusterPartition.this.objectFromByteBufferResponseInternal(buf);
          // HAServiceResponse is only received when a scoped classloader is required for unmarshalling
          if (!(retval instanceof HAServiceResponse))
          {
@@ -1834,7 +1905,7 @@
          }
           
          String serviceName = ((HAServiceResponse)retval).getServiceName();
-         byte[] payload = ((HAServiceResponse)retval).getPayload();   
+         byte[] payload = ((HAServiceResponse)retval).getPayload();
 
          ClassLoader previousCL = null;
          boolean overrideCL = false;
@@ -1846,11 +1917,13 @@
                previousCL = Thread.currentThread().getContextClassLoader();
                ClassLoader loader = weak.get();
                if( trace )
-                  ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);            
+               {
+                  ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
+               }
                overrideCL = true;
                Thread.currentThread().setContextClassLoader(loader);
             }
-            retval = objectFromByteBufferResponseInternal(payload);
+            retval = ClusterPartition.this.objectFromByteBufferResponseInternal(payload);
    
             return retval;
          }
@@ -1866,16 +1939,16 @@
 
       public byte[] objectToByteBuffer(Object obj) throws Exception
       {
-         return objectToByteBufferResponseInternal(obj);
-      }      
+         return ClusterPartition.this.objectToByteBufferResponseInternal(obj);
+      }
    }
    
    /**
-    * Overrides RpcDispatcher.Handle so that we can dispatch to many 
+    * Overrides RpcDispatcher.Handle so that we can dispatch to many
     * different objects.
     */
    private class RpcHandler extends RpcDispatcher
-   {      
+   {
       private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
             boolean deadlock_detection)
       {
@@ -1883,8 +1956,8 @@
       }
       
       /**
-       * Analyze the MethodCall contained in <code>req</code> to find the 
-       * registered service object to invoke against, and then execute it 
+       * Analyze the MethodCall contained in <code>req</code> to find the
+       * registered service object to invoke against, and then execute it
        * against *that* object and return result.
        *
        * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
@@ -1903,19 +1976,21 @@
          byte[] request_bytes = null;
          
          if( trace )
-            this.log.trace("Partition " + getPartitionName() + " received msg");
+         {
+            this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " received msg");
+         }
          if(req == null || req.getBuffer() == null)
          {
-            this.log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
+            this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message or message buffer is null!");
             return null;
          }
          
          try
          {
-            Object wrapper = objectFromByteBufferInternal(req.getBuffer());
+            Object wrapper = ClusterPartition.this.objectFromByteBufferInternal(req.getBuffer());
             if(wrapper == null || !(wrapper instanceof Object[]))
             {
-               this.log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
+               this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message wrapper does not contain Object[] object!");
                return null;
             }
 
@@ -1929,34 +2004,38 @@
             if (handler == null)
             {
                if( trace )
-                  this.log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
+               {
+                  this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " no rpc handler registered under service " + service);
+               }
                return new NoHandlerForRPC();
             }
          }
          catch(Exception e)
          {
-            this.log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
+            this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
             return null;
          }
          
          try
-         {            
+         {
             // If client registered the service with a classloader, override the thread classloader here
             WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
             if (weak != null)
             {
                if( trace )
+               {
                   this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
+               }
                previousCL = Thread.currentThread().getContextClassLoader();
                ClassLoader loader = weak.get();
                overrideCL = true;
                Thread.currentThread().setContextClassLoader(loader);
             }
-            body = objectFromByteBufferInternal(request_bytes);
+            body = ClusterPartition.this.objectFromByteBufferInternal(request_bytes);
          }
          catch (Exception e)
          {
-            this.log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
+            this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed extracting message body from request bytes", e);
             return null;
          }
          finally
@@ -1970,21 +2049,23 @@
          
          if(body == null || !(body instanceof MethodCall))
          {
-            this.log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
+            this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message does not contain a MethodCall object!");
             return null;
          }
          
          // get method call information
          MethodCall method_call = (MethodCall)body;
-         String methodName = method_call.getName();      
+         String methodName = method_call.getName();
          
          if( trace )
+         {
             this.log.trace("full methodName: " + methodName);
+         }
          
          int idx = methodName.lastIndexOf('.');
          String handlerName = methodName.substring(0, idx);
          String newMethodName = methodName.substring(idx + 1);
-         if( trace ) 
+         if( trace )
          {
             this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
             this.log.trace("Handle: " + methodName);
@@ -2003,16 +2084,20 @@
             if (overrideCL)
             {
                // wrap the response so that the service name can be accessed during unmarshalling of the response
-               byte[] retbytes = objectToByteBufferResponseInternal(retval);
+               byte[] retbytes = ClusterPartition.this.objectToByteBufferResponseInternal(retval);
                retval = new HAServiceResponse(handlerName, retbytes);
             }
             if( trace )
+            {
                this.log.trace("rpc call return value: " + retval);
+            }
          }
          catch (Throwable t)
          {
             if( trace )
-               this.log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
+            {
+               this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " rpc call threw exception", t);
+            }
             retval = t;
          }
 
@@ -2037,32 +2122,36 @@
 
        private int generation;
 
-       public synchronized void close() 
+       public synchronized void close()
        {
            this.isOpen = false;
        }
 
-       public synchronized void open() 
+       public synchronized void open()
        {
            ++this.generation;
            this.isOpen = true;
-           notifyAll();
+           this.notifyAll();
        }
 
        // BLOCKS-UNTIL: opened-since(generation on entry)
-       public synchronized void await() throws InterruptedException 
+       public synchronized void await() throws InterruptedException
        {
            int arrivalGeneration = this.generation;
            while(!this.isOpen && arrivalGeneration == this.generation)
-               wait();
+         {
+            this.wait();
+         }
        }
        
        // BLOCKS-UNTIL: opened-since(generation on entry)
-       public synchronized void await(long timeout) throws InterruptedException 
+       public synchronized void await(long timeout) throws InterruptedException
        {
            int arrivalGeneration = this.generation;
            while(!this.isOpen && arrivalGeneration == this.generation)
-               wait(timeout);
+         {
+            this.wait(timeout);
+         }
        }
    }
    

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java	2008-06-16 20:53:23 UTC (rev 74642)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java	2008-06-16 21:07:51 UTC (rev 74643)
@@ -21,38 +21,39 @@
   */
 package org.jboss.ha.framework.server;
 
-import java.util.Set;
-import java.util.Vector;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import java.io.Serializable;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-import org.jboss.logging.Logger;
-
 import org.jboss.ha.framework.interfaces.ClusterNode;
 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
 import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.logging.Logger;
 
 
-/** 
+/**
  * This class manages replicated objects.
  * 
  * @author  <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
  * @author  <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
  * @author  Scott.stark at jboss.org
  * @author  <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ * @author  <a href="mailto:pferraro at redhat.com">Paul Ferraro</a>
  * @version $Revision$
  */
 public class DistributedReplicantManagerImpl
@@ -63,36 +64,38 @@
 {
    // Constants -----------------------------------------------------
    
-   protected final static String SERVICE_NAME = "DistributedReplicantManager";
+   static final String SERVICE_NAME = "DistributedReplicantManager";
    
+   private static final Class<?>[] add_types = new Class<?>[] { String.class, String.class, Serializable.class };
+   private static final Class<?>[] remove_types = new Class<?>[] { String.class, String.class };
+
    // Attributes ----------------------------------------------------
-   protected static int threadID;
+   private static AtomicInteger threadID = new AtomicInteger();
    
-   protected Map localReplicants = new ConcurrentReaderHashMap();
-   protected Map replicants = new ConcurrentReaderHashMap();
-   protected Map keyListeners = new ConcurrentReaderHashMap();
-   protected Map intraviewIdCache = new HashMap();
-   protected HAPartition partition; 
+   private ConcurrentMap<String, Serializable> localReplicants = new ConcurrentHashMap<String, Serializable>();
+   private ConcurrentMap<String, ConcurrentMap<String, Serializable>> replicants = new ConcurrentHashMap<String, ConcurrentMap<String, Serializable>>();
+   private ConcurrentMap<String, List<ReplicantListener>> keyListeners = new ConcurrentHashMap<String, List<ReplicantListener>>();
+   private Map<String, Integer> intraviewIdCache = new ConcurrentHashMap<String, Integer>();
+   
+   private HAPartition partition;
    /** The handler used to send replicant change notifications asynchronously */
-   protected AsynchEventHandler asynchHandler;  
+   private AsynchEventHandler asynchHandler;
    
-   protected Logger log;
+   private Logger log = Logger.getLogger(this.getClass());
+   private boolean trace = this.log.isTraceEnabled();
    
-   protected String nodeName = null;
+   private String nodeName = null;
    
-   protected Latch partitionNameKnown = new Latch ();
-   protected boolean trace;
+   // Works like a simple latch
+   private CountDownLatch partitionNameKnown = new CountDownLatch(1);
 
-   protected Class[] add_types=new Class[]{String.class, String.class, Serializable.class};
-   protected Class[] remove_types=new Class[]{String.class, String.class};
-
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
-   public DistributedReplicantManagerImpl() 
+   public DistributedReplicantManagerImpl()
    {
       super();
-      this.log = Logger.getLogger (this.getClass ());
+
       // JBAS-5068 Create the handler early so we don't risk NPEs
       this.asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
    }
@@ -101,24 +104,26 @@
    
    public void createService() throws Exception
    {
-      if (partition == null)
+      if (this.partition == null)
+      {
          throw new IllegalStateException("HAPartition property must be set before creating DistributedReplicantManager service");
+      }
 
-      log.debug("registerRPCHandler");
-      partition.registerRPCHandler(SERVICE_NAME, this);
-      log.debug("subscribeToStateTransferEvents");
-      partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
-      log.debug("registerMembershipListener");
-      partition.registerMembershipListener(this);
+      this.log.debug("registerRPCHandler");
+      this.partition.registerRPCHandler(SERVICE_NAME, this);
+      this.log.debug("subscribeToStateTransferEvents");
+      this.partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
+      this.log.debug("registerMembershipListener");
+      this.partition.registerMembershipListener(this);
    }
    
    public void startService() throws Exception
    {
       this.nodeName = this.partition.getNodeName();
       
-      asynchHandler.start();
+      this.asynchHandler.start();
 
-      partitionNameKnown.release (); // partition name is now known!
+      this.partitionNameKnown.countDown(); // partition name is now known!
       
       //log.info("mergemembers");
       //mergeMembers();
@@ -129,11 +134,11 @@
       // Stop the asynch handler thread
       try
       {
-         asynchHandler.stop();
+         this.asynchHandler.stop();
       }
       catch( Exception e)
       {
-         log.warn("Failed to stop asynchHandler", e);
+         this.log.warn("Failed to stop asynchHandler", e);
       }
       
       // TODO reset the latch
@@ -141,48 +146,39 @@
 
    // NR 200505 : [JBCLUSTER-38] unbind at destroy
    public void destroyService() throws Exception
-   {      
+   {
       // we cleanly shutdown. This should be optimized.
-      if (localReplicants != null)
+      for (String key: this.localReplicants.keySet())
       {
-         synchronized(localReplicants)
-         {
-            String[] keys = new String[localReplicants.size()];
-            localReplicants.keySet().toArray(keys);
-            for(int n = 0; n < keys.length; n ++)
-            {               
-               this.removeLocal(keys[n]); // channel is disconnected, so
-                                          // don't try to notify cluster
-            }
-         }
+         this.removeLocal(key); // channel is disconnected, so don't try to notify cluster
       }
       
-      if (partition != null)
-      {   
-         partition.unregisterRPCHandler(SERVICE_NAME, this);
-         partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
-         partition.unregisterMembershipListener(this);
+      if (this.partition != null)
+      {
+         this.partition.unregisterRPCHandler(SERVICE_NAME, this);
+         this.partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
+         this.partition.unregisterMembershipListener(this);
       }
    }
 
    public void registerWithJmx(MBeanServer server) throws Exception
    {
-      server.registerMBean(this, getObjectName());
+      server.registerMBean(this, this.getObjectName());
    }
    
    public void unregisterWithJmx(MBeanServer server) throws Exception
    {
-      server.unregisterMBean(getObjectName());
+      server.unregisterMBean(this.getObjectName());
    }
    
    private ObjectName getObjectName() throws Exception
    {
-      return new ObjectName("jboss:service=" + SERVICE_NAME + ",partition=" + partition.getPartitionName());
+      return new ObjectName("jboss:service=" + SERVICE_NAME + ",partition=" + this.partition.getPartitionName());
    }
    
    public HAPartition getHAPartition()
    {
-      return partition;
+      return this.partition;
    }
 
    public void setHAPartition(HAPartition clusterPartition)
@@ -190,38 +186,38 @@
       this.partition = clusterPartition;
    }
    
-   public String listContent () throws Exception
+   public String listContent() throws Exception
    {
+      StringBuilder result = new StringBuilder();
+      
+      result.append("<pre>");
+
       // we merge all replicants services: local only or not
       //
-      java.util.Collection services = this.getAllServices ();
-
-      StringBuffer result = new StringBuffer ();
-      java.util.Iterator catsIter = services.iterator ();
-      
-      result.append ("<pre>");
-      
-      while (catsIter.hasNext ())
+      for (String category: this.getAllServices())
       {
-         String category = (String)catsIter.next ();
-         Map content = (Map)this.replicants.get (category);
-         if (content == null)
-            content = new HashMap ();
-         java.util.Iterator keysIter = content.keySet ().iterator ();
-                  
-         result.append ("-----------------------------------------------\n");
-         result.append ("Service : ").append (category).append ("\n\n");
+         result.append("-----------------------------------------------\n");
+         result.append("Service : ").append(category).append("\n\n");
          
-         Serializable local = lookupLocalReplicant(category);
+         Serializable local = this.localReplicants.get(category);
+         
          if (local == null)
-            result.append ("\t- Service is *not* available locally\n");
+         {
+            result.append("\t- Service is *not* available locally\n");
+         }
          else
-            result.append ("\t- Service *is* also available locally\n");
+         {
+            result.append("\t- Service *is* also available locally\n");
+         }
 
-         while (keysIter.hasNext ())
+         Map<String, Serializable> content = this.replicants.get(category);
+         
+         if (content != null)
          {
-            String location = (String)keysIter.next ();            
-            result.append ("\t- ").append(location).append ("\n");
+            for (String location: content.keySet())
+            {
+               result.append("\t- ").append(location).append("\n");
+            }
          }
          
          result.append ("\n");
@@ -230,110 +226,106 @@
       
       result.append ("</pre>");
       
-      return result.toString ();
+      return result.toString();
    }
    
-   public String listXmlContent () throws Exception
+   public String listXmlContent() throws Exception
    {
-      // we merge all replicants services: local only or not
-      //
-      java.util.Collection services = this.getAllServices ();
-      StringBuffer result = new StringBuffer ();
-
+      StringBuilder result = new StringBuilder();
+      
       result.append ("<ReplicantManager>\n");
 
-      java.util.Iterator catsIter = services.iterator ();
-      while (catsIter.hasNext ())
+      // we merge all replicants services: local only or not
+      //
+      for (String category: this.getAllServices())
       {
-         String category = (String)catsIter.next ();
-         Map content = (Map)this.replicants.get (category);
-         if (content == null)
-            content = new HashMap ();
-         java.util.Iterator keysIter = content.keySet ().iterator ();
-                  
-         result.append ("\t<Service>\n");
-         result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
+         result.append("\t<Service>\n");
+         result.append("\t\t<ServiceName>").append(category).append("</ServiceName>\n");
 
+         Serializable local = this.localReplicants.get(category);
          
-         Serializable local = lookupLocalReplicant(category);
          if (local != null)
          {
-            result.append ("\t\t<Location>\n");
-            result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
-            result.append ("\t\t</Location>\n");
+            result.append("\t\t<Location>\n");
+            result.append("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
+            result.append("\t\t</Location>\n");
          }
 
-         while (keysIter.hasNext ())
+         Map<String, Serializable> content = this.replicants.get(category);
+         
+         if (content != null)
          {
-            String location = (String)keysIter.next ();            
-            result.append ("\t\t<Location>\n");
-            result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
-            result.append ("\t\t</Location>\n");
+            for (String location: content.keySet())
+            {
+               result.append("\t\t<Location>\n");
+               result.append("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
+               result.append("\t\t</Location>\n");
+            }
          }
          
-         result.append ("\t<Service>\n");
-         
+         result.append("\t</Service>\n");
       }
 
-      result.append ("<ReplicantManager>\n");
+      result.append("</ReplicantManager>\n");
       
-      return result.toString ();
+      return result.toString();
    }
 
    // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
    
-   public Serializable getCurrentState ()
+   public Serializable getCurrentState()
    {
-      java.util.Collection services = this.getAllServices ();
-      HashMap result = new HashMap ();
+      Map<String, ConcurrentMap<String, Serializable>> result = new HashMap<String, ConcurrentMap<String, Serializable>>();
       
-      java.util.Iterator catsIter = services.iterator ();                       
-      while (catsIter.hasNext ())
+      for (String category: this.getAllServices())
       {
-         String category = (String)catsIter.next ();
-         Map content = (Map)this.replicants.get (category);
-         if (content == null)
-            content = new HashMap();
-         else
+         ConcurrentMap<String, Serializable> map = new ConcurrentHashMap<String, Serializable>();
+         
+         ConcurrentMap<String, Serializable> content = this.replicants.get(category);
+         
+         if (content != null)
          {
-            Map temp = new HashMap();
-            temp.putAll(content);
-            content = temp;
+            map.putAll(content);
          }
          
-         Serializable local = lookupLocalReplicant(category);
+         Serializable local = this.localReplicants.get(category);
+         
          if (local != null)
-            content.put (this.nodeName, local);
+         {
+            map.put(this.nodeName, local);
+         }
          
-         result.put (category, content);
+         result.put(category, map);
       }
       
       // we add the intraviewid cache to the global result
       //
-      Object[] globalResult = new Object[] {result, intraviewIdCache};
-      return globalResult;
+      return new Object[] { result, this.intraviewIdCache };
    }
 
+   @SuppressWarnings("unchecked")
    public void setCurrentState(Serializable newState)
    {
-      Object[] globalState = (Object[])newState;
-      Map map = (Map)globalState[0];
+      Object[] globalState = (Object[]) newState;
+      Map<String, ConcurrentMap<String, Serializable>> map = (Map) globalState[0];
+      
       this.replicants.putAll(map);
-      this.intraviewIdCache = (Map)globalState[1];
+      
+      this.intraviewIdCache = (Map) globalState[1];
 
-      if( trace )
+      if (this.trace)
       {
-         log.trace(nodeName + ": received new state, will republish local replicants");
+         this.log.trace(this.nodeName + ": received new state, will republish local replicants");
       }
-      MembersPublisher publisher = new MembersPublisher();
-      publisher.start();
+      
+      new MembersPublisher().start();
    }
       
-   public Collection getAllServices ()
+   public Collection<String> getAllServices()
    {
-      HashSet services = new HashSet();
-      services.addAll (localReplicants.keySet ());
-      services.addAll (replicants.keySet ());      
+      Set<String> services = new HashSet<String>();
+      services.addAll(this.localReplicants.keySet());
+      services.addAll(this.replicants.keySet());
       return services;
    }
    
@@ -344,11 +336,11 @@
       // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
       // and then notify all listening nodes.
       //
-      log.info("Merging partitions...");
-      log.info("Dead members: " + deadMembers.size());
-      log.info("Originating groups: " + originatingGroups);
-      purgeDeadMembers(deadMembers, true);
-      if (newMembers.size() > 0) 
+      this.log.info("Merging partitions...");
+      this.log.info("Dead members: " + deadMembers.size());
+      this.log.info("Originating groups: " + originatingGroups);
+      this.purgeDeadMembers(deadMembers, true);
+      if (newMembers.size() > 0)
       {
          new MergeMembers().start();
       }
@@ -359,11 +351,11 @@
       // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
       // and then notify all listening nodes.
       //
-      log.info("I am (" + nodeName + ") received membershipChanged event:");
-      log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
-      log.info("New Members : " + newMembers.size()  + " (" + newMembers + ")");
-      log.info("All Members : " + allMembers.size()  + " (" + allMembers + ")");
-      purgeDeadMembers(deadMembers, false);
+      this.log.info("I am (" + this.nodeName + ") received membershipChanged event:");
+      this.log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
+      this.log.info("New Members : " + newMembers.size()  + " (" + newMembers + ")");
+      this.log.info("All Members : " + allMembers.size()  + " (" + allMembers + ")");
+      this.purgeDeadMembers(deadMembers, false);
       
       // we don't need to merge members anymore
    }
@@ -373,263 +365,297 @@
    public void processEvent(Object event)
    {
       KeyChangeEvent kce = (KeyChangeEvent) event;
-      notifyKeyListeners(kce.key, kce.replicants, kce.merge);
+      this.notifyKeyListeners(kce.key, kce.replicants, kce.merge);
    }
    
    static class KeyChangeEvent
    {
       String key;
-      List replicants;
+      List<Serializable> replicants;
       boolean merge;
    }
    
-   // DistributedReplicantManager implementation ----------------------------------------------              
+   // DistributedReplicantManager implementation ----------------------------------------------
    
    public void add(String key, Serializable replicant) throws Exception
    {
-      if( trace )
-         log.trace("add, key="+key+", value="+replicant);
-      partitionNameKnown.acquire (); // we don't propagate until our name is known
+      if (this.trace)
+      {
+         this.log.trace("add, key=" + key + ", value=" + replicant);
+      }
       
-      Object[] args = {key, this.nodeName, replicant};
-      partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
-      synchronized(localReplicants)
+      this.partitionNameKnown.await(); // we don't propagate until our name is known
+      
+      Object[] args = { key, this.nodeName, replicant };
+      
+      this.partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+
+      List<Serializable> replicants = null;
+      
+      synchronized (this.localReplicants)
       {
-         localReplicants.put(key, replicant);
-         notifyKeyListeners(key, lookupReplicants(key), false);
+         this.localReplicants.put(key, replicant);
+         
+         replicants = this.getReplicants(key);
       }
+      
+      this.notifyKeyListeners(key, replicants, false);
    }
    
    public void remove(String key) throws Exception
-   {      
-      partitionNameKnown.acquire (); // we don't propagate until our name is known
+   {
+      this.partitionNameKnown.await(); // we don't propagate until our name is known
       
       // optimisation: we don't make a costly network call
       // if there is nothing to remove
-      if (localReplicants.containsKey(key))
+      if (this.localReplicants.containsKey(key))
       {
-         Object[] args = {key, this.nodeName};
-         partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
-         removeLocal(key);
+         Object[] args = { key, this.nodeName };
+         
+         this.partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
+         
+         this.removeLocal(key);
       }
    }
    
    private void removeLocal(String key)
    {
-      synchronized(localReplicants)
+      List<Serializable> replicants = null;
+      
+      synchronized (this.localReplicants)
       {
-         localReplicants.remove(key);
-         List result = lookupReplicants(key);
-         if (result == null)
-            result = new ArrayList (); // don't pass null but an empty list
-         notifyKeyListeners(key, result, false);
+         if (this.localReplicants.remove(key) != null)
+         {
+            replicants = this.getReplicants(key);
+         }
       }
+      
+      if (replicants != null)
+      {
+         this.notifyKeyListeners(key, replicants, false);
+      }
    }
    
    public Serializable lookupLocalReplicant(String key)
    {
-      return (Serializable)localReplicants.get(key);
+      return this.localReplicants.get(key);
    }
    
-   public List lookupReplicants(String key)
+   public List<Serializable> lookupReplicants(String key)
    {
-      Serializable local = lookupLocalReplicant(key);
-      Map replicant = (Map)replicants.get(key);
-      if (replicant == null && local == null)
-         return null;
+      Serializable local = this.localReplicants.get(key);
+      
+      Map<String, Serializable> replicant = this.replicants.get(key);
 
-      ArrayList rtn = new ArrayList(); 
-
       if (replicant == null)
       {
-         if (local != null)
-            rtn.add(local);
+         return (local != null) ? Collections.singletonList(local) : null;
       }
-      else 
+
+      // JBAS-2677. Put the replicants in view order.
+      ClusterNode[] nodes = this.partition.getClusterNodes();
+
+      List<Serializable> result = new ArrayList<Serializable>(nodes.length);
+      
+      for (ClusterNode node: nodes)
       {
-         // JBAS-2677. Put the replicants in view order.
-         ClusterNode[] nodes = partition.getClusterNodes();
-         String replNode;
-         Object replVal;
-         for (int i = 0; i < nodes.length; i++)
+         String name = node.getName();
+         
+         if (local != null && this.nodeName.equals(name))
          {
-            replNode = nodes[i].getName();
-            if (local != null && nodeName.equals(replNode))
+            result.add(local);
+         }
+         else
+         {
+            Serializable value = replicant.get(name);
+            
+            if (value != null)
             {
-               rtn.add(local);
-               continue;
+               result.add(value);
             }
-            
-            replVal = replicant.get(replNode);
-            if (replVal != null)
-               rtn.add(replVal);            
          }
       }
       
-      return rtn;
+      return result;
    }
    
-   public List lookupReplicantsNodeNames(String key)
-   {      
-      List<ClusterNode> nodes = lookupReplicantsNodes(key);
-      if (nodes == null)
+   private List<Serializable> getReplicants(String key)
+   {
+      List<Serializable> result = this.lookupReplicants(key);
+      
+      if (result == null)
       {
-         return null;
+         result = Collections.emptyList();
       }
       
+      return result;
+   }
+   
+   @Deprecated
+   public List<String> lookupReplicantsNodeNames(String key)
+   {
+      List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
+      
+      if (nodes == null) return null;
+      
       List<String> nodeNames = new ArrayList<String>(nodes.size());
+      
       for (ClusterNode node : nodes)
       {
          nodeNames.add(node.getName());
-      }  
+      }
       
       return nodeNames;
    }
 
    public List<ClusterNode> lookupReplicantsNodes(String key)
    {
-      boolean locallyReplicated = localReplicants.containsKey (key);
-      Map replicant = (Map)replicants.get(key);
-      if (replicant == null && !locallyReplicated)
-         return null;
-
-      List<ClusterNode> rtn = new ArrayList<ClusterNode>();
+      boolean local = this.localReplicants.containsKey(key);
+      Map<String, Serializable> replicant = this.replicants.get(key);
       
       if (replicant == null)
-      {   
-         if (locallyReplicated)
-            rtn.add(partition.getClusterNode());
+      {
+         return local ? Collections.singletonList(this.partition.getClusterNode()) : null;
       }
-      else
+      
+      Set<String> keys = replicant.keySet();
+      ClusterNode[] nodes = this.partition.getClusterNodes();
+      List<ClusterNode> rtn = new ArrayList<ClusterNode>(nodes.length);
+
+      for (ClusterNode node : nodes)
       {
-         Set keys = replicant.keySet();
-         ClusterNode[] nodes = partition.getClusterNodes();
-         String keyOwner;
-         for (int i = 0; i < nodes.length; i++)
+         String name = node.getName();
+         
+         if (local && this.nodeName.equals(name))
          {
-            keyOwner = nodes[i].getName();
-            if (locallyReplicated && nodeName.equals(keyOwner))
-            {
-               rtn.add(partition.getClusterNode());
-               continue;
-            }
-            
-            if (keys.contains(keyOwner))
-               rtn.add(nodes[i]);            
+            rtn.add(this.partition.getClusterNode());
          }
+         else if (keys.contains(name))
+         {
+            rtn.add(node);
+         }
       }
       
       return rtn;
-   }   
+   }
    
-   public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
+   public void registerListener(String key, ReplicantListener subscriber)
    {
-      synchronized(keyListeners)
-      {
-         ArrayList listeners = (ArrayList)keyListeners.get(key);
-         if (listeners == null)
-         {
-            listeners = new ArrayList();
-            keyListeners.put(key, listeners);
-         }
-         listeners.add(subscriber);
-      }
+      List<ReplicantListener> list = new CopyOnWriteArrayList<ReplicantListener>();
+      
+      List<ReplicantListener> existing = this.keyListeners.putIfAbsent(key, list);
+      
+      ((existing != null) ? existing : list).add(subscriber);
    }
    
    public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
    {
-      synchronized(keyListeners)
+      List<ReplicantListener> listeners = this.keyListeners.get(key);
+      
+      if (listeners != null)
       {
-         ArrayList listeners = (ArrayList)keyListeners.get (key);
-         if (listeners == null) return;
-         
          listeners.remove(subscriber);
-         if (listeners.size() == 0)
-            keyListeners.remove(key);
-
+         
+         this.keyListeners.remove(key, Collections.emptyList());
       }
    }
    
-   public int getReplicantsViewId(String key)   
+   public int getReplicantsViewId(String key)
    {
-      Integer result = (Integer)this.intraviewIdCache.get (key);
+      Integer result = this.intraviewIdCache.get(key);
       
-      if (result == null)
-         return 0;
-      else
-         return result.intValue ();      
+      return (result != null) ? result.intValue() : 0;
    }
    
-   public boolean isMasterReplica (String key)
+   public boolean isMasterReplica(String key)
    {
-      if( trace )
-         log.trace("isMasterReplica, key="+key);
-      // if I am not a replicat, I cannot be the master...
+      if (this.trace)
+      {
+         this.log.trace("isMasterReplica, key=" + key);
+      }
+      // if I am not a replicant, I cannot be the master...
       //
-      if (!localReplicants.containsKey (key))
+      if (!this.localReplicants.containsKey(key))
       {
-         if( trace )
-            log.trace("no localReplicants, key="+key+", isMasterReplica=false");
+         if (this.trace)
+         {
+            this.log.trace("no localReplicants, key=" + key + ", isMasterReplica=false");
+         }
          return false;
       }
 
-      Vector allNodes = this.partition.getCurrentView ();
-      Map repForKey = (Map)replicants.get(key);
-      if (repForKey==null)
+      Map<String, Serializable> repForKey = this.replicants.get(key);
+      if (repForKey == null)
       {
-         if( trace )
-            log.trace("no replicants, key="+key+", isMasterReplica=true");
+         if (this.trace)
+         {
+            this.log.trace("no replicants, key=" + key + ", isMasterReplica=true");
+         }
          return true;
       }
-      Vector replicaNodes = new Vector ((repForKey).keySet ());          
-      boolean isMasterReplica = false;
-      for (int i=0; i<allNodes.size (); i++)
+
+      Vector<String> allNodes = this.partition.getCurrentView();
+      for (String node: allNodes)
       {
-         String aMember = (String)allNodes.elementAt (i);
-         if( trace )
-            log.trace("Testing member: "+aMember);
-         if (replicaNodes.contains (aMember))
+         if (this.trace)
          {
-            if( trace )
-               log.trace("Member found in replicaNodes, isMasterReplica=false");
-            break;
+            this.log.trace("Testing member: " + node);
          }
-         else if (aMember.equals (this.nodeName))
+         
+         if (repForKey.containsKey(node))
          {
-            if( trace )
-               log.trace("Member == nodeName, isMasterReplica=true");
-            isMasterReplica = true;
-            break;
+            if (this.trace)
+            {
+               this.log.trace("Member found in replicaNodes, isMasterReplica=false");
+            }
+            return false;
          }
+         else if (node.equals(this.nodeName))
+         {
+            if (this.trace)
+            {
+               this.log.trace("Member == nodeName, isMasterReplica=true");
+            }
+            return true;
+         }
       }
-      return isMasterReplica;
+      return false;
    }
 
-   // DistributedReplicantManager cluster callbacks ----------------------------------------------              
+   // DistributedReplicantManager cluster callbacks ----------------------------------------------
    
    /**
     * Cluster callback called when a new replicant is added on another node
     * @param key Replicant key
     * @param nodeName Node that add the current replicant
     * @param replicant Serialized representation of the replicant
-    */   
+    */
    public void _add(String key, String nodeName, Serializable replicant)
    {
-      if( trace )
-         log.trace("_add(" + key + ", " + nodeName);
+      if (this.trace)
+      {
+         this.log.trace("_add(" + key + ", " + nodeName);
+      }
       
+      KeyChangeEvent event = new KeyChangeEvent();
+      event.key = key;
+      
+      synchronized (this.replicants)
+      {
+         this.addReplicant(key, nodeName, replicant);
+         
+         event.replicants = this.getReplicants(key);
+      }
+      
       try
       {
-         addReplicant(key, nodeName, replicant);
-         // Notify listeners asynchronously
-         KeyChangeEvent kce = new KeyChangeEvent();
-         kce.key = key;
-         kce.replicants = lookupReplicants(key);
-         asynchHandler.queueEvent(kce);
+         this.asynchHandler.queueEvent(event);
       }
-      catch (Exception ex)
+      catch (InterruptedException e)
       {
-         log.error("_add failed", ex);
+         Thread.currentThread().interrupt();
+         
+         this.log.error("_add failed", e);
       }
    }
    
@@ -637,42 +663,50 @@
     * Cluster callback called when a replicant is removed by another node
     * @param key Name of the replicant key
     * @param nodeName Node that wants to remove its replicant for the give key
-    */   
+    */
    public void _remove(String key, String nodeName)
    {
-      try
+      KeyChangeEvent event = new KeyChangeEvent();
+      event.key = key;
+      
+      synchronized (this.replicants)
       {
-         if (removeReplicant (key, nodeName)) {
-            // Notify listeners asynchronously
-            KeyChangeEvent kce = new KeyChangeEvent();
-            kce.key = key;
-            kce.replicants = lookupReplicants(key);
-            asynchHandler.queueEvent(kce);
+         if (this.removeReplicant(key, nodeName))
+         {
+            event.replicants = this.getReplicants(key);
          }
       }
-      catch (Exception ex)
+      
+      if (event.replicants != null)
       {
-         log.error("_remove failed", ex);
+         try
+         {
+            this.asynchHandler.queueEvent(event);
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+            
+            this.log.error("_remove failed", e);
+         }
       }
    }
    
-   protected boolean removeReplicant (String key, String nodeName) throws Exception
+   protected boolean removeReplicant(String key, String nodeName)
    {
-      synchronized(replicants)
+      Map<String, Serializable> replicant = this.replicants.get(key);
+      
+      if (replicant != null)
       {
-         Map replicant = (Map)replicants.get(key);
-         if (replicant == null) return false;
-         Object removed = replicant.remove(nodeName);
-         if (removed != null)
+         if (replicant.remove(nodeName) != null)
          {
-            Collection values = replicant.values();               
-            if (values.size() == 0)
-            {
-               replicants.remove(key);
-            }
+            // If replicant map is empty, prune it
+            this.replicants.remove(key, Collections.emptyMap());
+            
             return true;
          }
       }
+      
       return false;
    }
    
@@ -680,14 +714,18 @@
     * Cluster callback called when a node wants to know our complete list of local replicants
     * @throws Exception Thrown if a cluster communication exception occurs
     * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
-    */   
+    */
    public Object[] lookupLocalReplicants() throws Exception
    {
-      partitionNameKnown.acquire (); // we don't answer until our name is known
+      this.partitionNameKnown.await(); // we don't answer until our name is known
       
-      Object[] rtn = {this.nodeName, localReplicants};
-      if( trace )
-         log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
+      Object[] rtn = { this.nodeName, this.localReplicants };
+      
+      if (this.trace)
+      {
+         this.log.trace("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + this.localReplicants.size());
+      }
+      
       return rtn;
    }
    
@@ -695,38 +733,38 @@
    
    // Protected -----------------------------------------------------
    
-   protected int calculateReplicantsHash (List members)
+   protected int calculateReplicantsHash(List<ClusterNode> members)
    {
       int result = 0;
-      Object obj = null;
       
-      for (int i=0; i<members.size (); i++)
+      for (ClusterNode member: members)
       {
-         obj = members.get (i);
-         if (obj != null)
-            result+= obj.hashCode (); // no explicit overflow with int addition
+         if (member != null)
+         {
+            result += member.getName().hashCode(); // no explicit overflow with int addition
+         }
       }
       
       return result;
    }
    
-   protected int updateReplicantsHashId (String key)
+   protected int updateReplicantsHashId(String key)
    {
       // we first get a list of all nodes names that replicate this key
       //
-      List nodes = this.lookupReplicantsNodeNames (key);
+      List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
       int result = 0;
       
-      if ( (nodes == null) || (nodes.size () == 0) )
+      if ((nodes == null) || nodes.isEmpty())
       {
          // no nore replicants for this key: we uncache our view id
          //
-         this.intraviewIdCache.remove (key);
+         this.intraviewIdCache.remove(key);
       }
       else
       {
-         result = this.calculateReplicantsHash (nodes);
-         this.intraviewIdCache.put (key, new Integer (result));
+         result = this.calculateReplicantsHash(nodes);
+         this.intraviewIdCache.put(key, new Integer(result));
       }
       
       return result;
@@ -742,115 +780,57 @@
     * @param key replicant key name
     * @param nodeName name of the node that adds this replicant
     * @param replicant Serialized representation of the replica
+    * @return true, if this replicant was newly added to the map, false otherwise
     */
-   protected void addReplicant(String key, String nodeName, Serializable replicant)
+   protected boolean addReplicant(String key, String nodeName, Serializable replicant)
    {
-      addReplicant(replicants, key, nodeName, replicant);
+      ConcurrentMap<String, Serializable> map = new ConcurrentHashMap<String, Serializable>();
+      
+      ConcurrentMap<String, Serializable> existingMap = this.replicants.putIfAbsent(key, map);
+      
+      return (((existingMap != null) ? existingMap : map).put(nodeName, replicant) != null);
    }
    
    /**
-    * Logic for adding replicant to any map.
-    * @param map structure in which adding the new replicant
-    * @param key name of the replicant key
-    * @param nodeName name of the node adding the replicant
-    * @param replicant serialized representation of the replicant that is added
-    */
-   protected void addReplicant(Map map, String key, String nodeName, Serializable replicant)
-   {
-      synchronized(map)
-      {
-         Map rep = (Map)map.get(key);
-         if (rep == null)
-         {
-            if( trace )
-               log.trace("_adding new HashMap");
-            rep = new HashMap();
-            map.put(key, rep);
-         }
-         rep.put(nodeName, replicant);         
-      }
-   }
-   
-   protected Vector getKeysReplicatedByNode (String nodeName)
-   {
-      Vector result = new Vector ();
-      synchronized (replicants)
-      {         
-         Iterator keysIter = replicants.keySet ().iterator ();
-         while (keysIter.hasNext ())
-         {
-            String key = (String)keysIter.next ();
-            Map values = (Map)replicants.get (key);
-            if ( (values != null) && values.containsKey (nodeName) )
-            {
-               result.add (key);
-            }
-         }
-      }
-      return result;
-   }
-   
-   /**
-    * Indicates if the a replicant already exists for a given key/node pair
-    * @param key replicant key name
-    * @param nodeName name of the node
-    * @return a boolean indicating if a replicant for the given node exists for the given key
-    */   
-   protected boolean replicantEntryAlreadyExists (String key, String nodeName)
-   {
-      return replicantEntryAlreadyExists (replicants, key, nodeName);
-   }
-   
-   /**
-    * Indicates if the a replicant already exists for a given key/node pair in the give data structure
-    */   
-   protected boolean replicantEntryAlreadyExists (Map map, String key, String nodeName)
-   {
-         Map rep = (Map)map.get(key);
-         if (rep == null)
-            return false;
-         else
-            return rep.containsKey (nodeName);
-   }
-   
-   /**
     * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
     * @param key The replicant key name
     * @param newReplicants The new list of replicants
     * @param merge is the notification the result of a cluster merge?
     * 
-    */   
-   protected void notifyKeyListeners(String key, List newReplicants, boolean merge)
+    */
+   protected void notifyKeyListeners(String key, List<Serializable> newReplicants, boolean merge)
    {
-      if( trace )
-         log.trace("notifyKeyListeners");
+      if (this.trace)
+      {
+         this.log.trace("notifyKeyListeners");
+      }
 
       // we first update the intra-view id for this particular key
       //
-      int newId = updateReplicantsHashId (key);
+      int newId = this.updateReplicantsHashId(key);
       
-      ArrayList listeners = (ArrayList)keyListeners.get(key);
+      List<ReplicantListener> listeners = this.keyListeners.get(key);
+
       if (listeners == null)
       {
-         if( trace )
-            log.trace("listeners is null");
+         if (this.trace)
+         {
+            this.log.trace("listeners is null");
+         }
          return;
       }
       
-      // ArrayList's iterator is not thread safe
-      DistributedReplicantManager.ReplicantListener[] toNotify = null;
-      synchronized(listeners) 
+      if (this.trace)
       {
-         toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
-         toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+         this.log.trace("notifying " + listeners.size() + " listeners for key change: " + key);
       }
       
-      if( trace )
-         log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
-      for (int i = 0; i < toNotify.length; i++)
+      for (ReplicantListener listener: listeners)
       {
-         if (toNotify[i] != null)
-            toNotify[i].replicantsChanged(key, newReplicants, newId, merge);
+         if (listener != null)
+         {
+            listener.replicantsChanged(key, newReplicants, newId, merge);
+         }
       }
    }
 
@@ -858,38 +838,40 @@
    {
       try
       {
-         if( trace )
-            log.trace("Start Re-Publish local replicants in DRM");
-
-         Map localReplicants;
-         synchronized (this.localReplicants)
+         if (this.trace)
          {
-            localReplicants = new HashMap(this.localReplicants);
+            this.log.trace("Start Re-Publish local replicants in DRM");
          }
 
-         Iterator entries = localReplicants.entrySet().iterator();
-         while( entries.hasNext() )
+         for (Map.Entry<String, Serializable> entry: this.localReplicants.entrySet())
          {
-            Map.Entry entry = (Map.Entry) entries.next();
-            String key = (String) entry.getKey();
-            Object replicant = entry.getValue();
+            Serializable replicant = entry.getValue();
+            
             if (replicant != null)
             {
-               if( trace )
-                  log.trace("publishing, key=" + key + ", value=" + replicant);
+               String key = entry.getKey();
+               
+               if (this.trace)
+               {
+                  this.log.trace("publishing, key=" + key + ", value=" + replicant);
+               }
 
-               Object[] args = {key, this.nodeName, replicant};
+               Object[] args = { key, this.nodeName, replicant };
 
-               partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
-               notifyKeyListeners(key, lookupReplicants(key), false);
+               this.partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+               
+               this.notifyKeyListeners(key, this.getReplicants(key), false);
             }
          }
-         if( trace )
-            log.trace("End Re-Publish local replicants");
+         
+         if (this.trace)
+         {
+            this.log.trace("End Re-Publish local replicants");
+         }
       }
       catch (Exception e)
       {
-         log.error("Re-Publish failed", e);
+         this.log.error("Re-Publish failed", e);
       }
    }
 
@@ -901,78 +883,91 @@
    {
       try
       {
-         log.debug("Start merging members in DRM service...");
-         java.util.HashSet notifies = new java.util.HashSet ();
-         ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
+         this.log.debug("Start merging members in DRM service...");
+         
+         ArrayList<?> rsp = this.partition.callMethodOnCluster(SERVICE_NAME,
                                         "lookupLocalReplicants",
                                         new Object[]{}, new Class[]{}, true);
-         if (rsp.size() == 0)
-            log.debug("No responses from other nodes during the DRM merge process.");
-         else 
-         { 
-            log.debug("The DRM merge process has received " + rsp.size() + " answers");
+         if (rsp.isEmpty())
+         {
+            this.log.debug("No responses from other nodes during the DRM merge process.");
          }
-         for (int i = 0; i < rsp.size(); i++)
+         else
          {
-            Object o = rsp.get(i);
-            if (o == null)
+            this.log.debug("The DRM merge process has received " + rsp.size() + " answers");
+         }
+         
+         // Record keys to be notified, and replicant list per key
+         Map<String, List<Serializable>> notifications = new HashMap<String, List<Serializable>>();
+         
+         // Perform add/remove and replicant lookup atomically
+         synchronized (this.replicants)
+         {
+            for (Object o: rsp)
             {
-               log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
-               continue;
-            }
-            else if (o instanceof Throwable)
-            {
-               log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
-               continue;
-            }
-            
-            Object[] objs = (Object[]) o;
-            String node = (String)objs[0];
-            Map replicants = (Map)objs[1];
-            Iterator keys = replicants.keySet().iterator();
-            
-            //FIXME: We don't remove keys in the merge process but only add new keys!
-            while (keys.hasNext())
-            {
-               String key = (String)keys.next();
-               // done to reduce duplicate notifications
-               if (!replicantEntryAlreadyExists  (key, node))
+               if (o == null)
                {
-                  addReplicant(key, node, (Serializable)replicants.get(key));
-                  notifies.add (key);
+                  this.log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
+                  continue;
                }
-            }
-            
-            Vector currentStatus = getKeysReplicatedByNode (node);
-            if (currentStatus.size () > replicants.size ())
-            {
-               // The merge process needs to remove some (now)
-               // unexisting keys
-               //
-               for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
+               else if (o instanceof Throwable)
                {
-                  String theKey = (String)currentStatus.elementAt (currentKeysId);
-                  if (!replicants.containsKey (theKey))
+                  this.log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+                  continue;
+               }
+               
+               Object[] objs = (Object[]) o;
+               String node = (String) objs[0];
+               Map<String, Serializable> replicants = (Map) objs[1];
+               
+               //FIXME: We don't remove keys in the merge process but only add new keys!
+               for (Map.Entry<String, Serializable> entry: replicants.entrySet())
+               {
+                  String key = entry.getKey();
+                  
+                  if (this.addReplicant(key, node, entry.getValue()))
                   {
-                     removeReplicant (theKey, node);
-                     notifies.add(theKey);
+                     notifications.put(key, null);
                   }
                }
+               
+               // The merge process needs to remove some (now) unexisting keys
+               for (Map.Entry<String, ConcurrentMap<String, Serializable>> entry: this.replicants.entrySet())
+               {
+                  String key = entry.getKey();
+                  
+                  if (entry.getValue().containsKey(node))
+                  {
+                     if (!replicants.containsKey(key))
+                     {
+                        if (this.removeReplicant(key, node))
+                        {
+                           notifications.put(key, null);
+                        }
+                     }
+                  }
+               }
             }
-         }   
+            
+            // Lookup replicants for each changed key
+            for (Map.Entry<String, List<Serializable>> entry: notifications.entrySet())
+            {
+               entry.setValue(this.getReplicants(entry.getKey()));
+            }
+         }
          
-         Iterator notifIter = notifies.iterator ();
-         while (notifIter.hasNext ())
+         // Notify recorded key changes
+         for (Map.Entry<String, List<Serializable>> entry: notifications.entrySet())
          {
-            String key = (String)notifIter.next ();
-            notifyKeyListeners(key, lookupReplicants(key), true);
+            this.notifyKeyListeners(entry.getKey(), entry.getValue(), true);
          }
-         log.debug ("..Finished merging members in DRM service");
 
-      }               
+         this.log.debug("..Finished merging members in DRM service");
+
+      }
       catch (Exception ex)
       {
-         log.error("merge failed", ex);
+         this.log.error("merge failed", ex);
       }
    }
 
@@ -980,65 +975,51 @@
     * Get rid of dead members from replicant list.
     * 
     * @param deadMembers the members that are no longer in the view
-    * @param merge       whether the membership change occurred during 
+    * @param merge       whether the membership change occurred during
     *                    a cluster merge
     */
-   protected void purgeDeadMembers(Vector deadMembers, boolean merge)
+   protected void purgeDeadMembers(Vector<ClusterNode> deadMembers, boolean merge)
    {
-      if (deadMembers.size() <= 0)
-         return;
+      if (deadMembers.isEmpty()) return;
 
-      log.debug("purgeDeadMembers, "+deadMembers);
-      try
+      this.log.debug("purgeDeadMembers, " + deadMembers);
+
+      List<String> deadNodes = new ArrayList<String>(deadMembers.size());
+      
+      for (ClusterNode member: deadMembers)
       {
-         synchronized(replicants)
+         deadNodes.add(member.getName());
+      }
+      
+      for (Map.Entry<String, ConcurrentMap<String, Serializable>> entry: this.replicants.entrySet())
+      {
+         String key = entry.getKey();
+         ConcurrentMap<String, Serializable> replicant = entry.getValue();
+         
+         List<Serializable> replicants = null;
+         
+         synchronized (this.replicants)
          {
-            Iterator keys = replicants.keySet().iterator();
-            while (keys.hasNext())
+            if (replicant.keySet().removeAll(deadNodes))
             {
-               String key = (String)keys.next();
-               Map replicant = (Map)replicants.get(key);
-               boolean modified = false;
-               for (int i = 0; i < deadMembers.size(); i++)
-               {
-                  String node = deadMembers.elementAt(i).toString();
-                  log.debug("trying to remove deadMember " + node + " for key " + key);
-                  Object removed = replicant.remove(node);
-                  if (removed != null) 
-                  {
-                     log.debug(node + " was removed");
-                     modified = true;
-                  }
-                  else
-                  {
-                     log.debug(node + " was NOT removed!!!");
-                  }
-               }
-               if (modified)
-               {
-                  notifyKeyListeners(key, lookupReplicants(key), merge);
-               }
+               replicants = this.getReplicants(key);
             }
          }
+         
+         if (replicants != null)
+         {
+            this.notifyKeyListeners(key, replicants, merge);
+         }
       }
-      catch (Exception ex)
-      {
-         log.error("purgeDeadMembers failed", ex);
-      }
    }
 
    /**
-    */   
+    */
    protected void cleanupKeyListeners()
    {
       // NOT IMPLEMENTED YET
    }
 
-   protected synchronized static int nextThreadID()
-   {
-      return threadID ++;
-   }
-
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------
@@ -1047,27 +1028,28 @@
    {
       public MergeMembers()
       {
-         super("DRM Async Merger#"+nextThreadID());
+         super("DRM Async Merger#" + threadID.getAndIncrement());
       }
 
       /**
        * Called when the service needs to merge with another partition. This
        * process is performed asynchronously
-       */      
+       */
       public void run()
       {
-         log.debug("Sleeping for 50ms before mergeMembers");
+         DistributedReplicantManagerImpl.this.log.debug("Sleeping for 50ms before mergeMembers");
          try
          {
             // if this thread invokes a cluster method call before
             // membershipChanged event completes, it could timeout/hang
             // we need to discuss this with Bela.
-            Thread.sleep(50); 
+            Thread.sleep(50);
          }
-         catch (Exception ignored)
+         catch (InterruptedException e)
          {
+            Thread.currentThread().interrupt();
          }
-         mergeMembers();
+         DistributedReplicantManagerImpl.this.mergeMembers();
       }
    }
 
@@ -1075,7 +1057,7 @@
    {
       public MembersPublisher()
       {
-         super("DRM Async Publisher#"+nextThreadID());
+         super("DRM Async Publisher#" + threadID.getAndIncrement());
       }
 
       /**
@@ -1084,7 +1066,7 @@
        */
       public void run()
       {
-         log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
+         DistributedReplicantManagerImpl.this.log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
          try
          {
             // if this thread invokes a cluster method call before
@@ -1092,10 +1074,11 @@
             // we need to discuss this with Bela.
             Thread.sleep(50);
          }
-         catch (Exception ignored)
+         catch (InterruptedException e)
          {
+            Thread.currentThread().interrupt();
          }
-         republishLocalReplicants();
+         DistributedReplicantManagerImpl.this.republishLocalReplicants();
       }
    }
 }




More information about the jboss-cvs-commits mailing list