[jboss-cvs] JBossAS SVN: r58501 - in branches/JBoss_4_0_3_SP1_CP: cluster/src/main/org/jboss/cache/invalidation/bridges cluster/src/main/org/jboss/ha/framework/interfaces cluster/src/main/org/jboss/ha/framework/server cluster/src/main/org/jboss/ha/jmx cluster/src/main/org/jboss/proxy/ejb cluster/src/main/org/jboss/proxy/generic testsuite/src/main/org/jboss/test/cluster/drm testsuite/src/main/org/jboss/test/cluster/test testsuite/src/main/org/jboss/test/testbeancluster/test

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 17 14:13:36 EST 2006


Author: vivekl at redhat.com
Date: 2006-11-17 14:13:31 -0500 (Fri, 17 Nov 2006)
New Revision: 58501

Added:
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java
   branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java
Modified:
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
   branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
   branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java
   branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java
Log:
ASPATCH-115: JBAS-3822: Port HAPartition concurrency fixes to 4.0.3.SP1

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -117,7 +117,14 @@
    
    // DistributedReplicantManager.ReplicantListener implementation ---------------------------
    
-   public void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
+   /**
+    * @todo examine thread safety. synchronized keyword was added to method 
+    * signature when internal behavior of DistributedReplicantManagerImpl was 
+    * changed so that multiple threads could concurrently send replicantsChanged
+    * notifications. Need to examine in detail how this method interacts with
+    * DistributedState to see if we can remove/narrow the synchronization. 
+    */
+   public synchronized void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
    {
       if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME))
       {

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -11,8 +11,6 @@
 import java.util.Vector;
 import java.util.ArrayList;
 
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
-
 /** 
  *
  *   @author  <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
@@ -35,9 +33,13 @@
    // *******************************
    //
    /**
-    * Return the name of the current name in the current partition. The name is
-    * dynamically determined by the partition.
-    * @return The partition name
+    * Return the name of this node in the current partition. The name is
+    * dynamically determined by the partition. The name will be the String
+    * returned by <code>getClusterNode().getName()</code>.
+    * 
+    * @return The node name
+    * 
+    * @see #getClusterNode()
     */   
    public String getNodeName();
    /**
@@ -268,8 +270,33 @@
     * @param listener The listener wishing to unsubscribe
     */   
    public void unregisterMembershipListener(HAMembershipListener listener);
-
    /**
+    * Returns whether this partition will synchronously notify any 
+    * HAMembershipListeners of membership changes using the calling thread
+    * from the underlying <code>ClusterPartition</code>.
+    * 
+    * @return <code>true</code> if registered listeners that don't implement
+    *         <code>AsynchHAMembershipExtendedListener</code> or
+    *         <code>AsynchHAMembershipListener</code> will be notified
+    *         synchronously of membership changes; <code>false</code> if
+    *         those listeners will be notified asynchronously.  Default
+    *         is <code>false</code>.
+    */
+   public boolean getAllowSynchronousMembershipNotifications();
+   /**
+    * Sets whether this partition will synchronously notify any 
+    * HAMembershipListeners of membership changes using the calling thread
+    * from the underlying <code>ClusterPartition</code>.
+    * 
+    * @param allowSync  <code>true</code> if registered listeners that don't 
+    *         implement <code>AsynchHAMembershipExtendedListener</code> or
+    *         <code>AsynchHAMembershipListener</code> should be notified
+    *         synchronously of membership changes; <code>false</code> if
+    *         those listeners can be notified asynchronously.  Default
+    *         is <code>false</code>.
+    */
+   public void setAllowSynchronousMembershipNotifications(boolean allowSync);
+   /**
     * Each time the partition topology changes, a new view is computed. A view is a list of members,
     * the first member being the coordinator of the view. Each view also has a distinct identifier.
     * @return The identifier of the current view
@@ -283,7 +310,9 @@
 
    /**
     * Return the member nodes that built the current view i.e. the current partition.
-    * @return An array of ClusterNode containing the node names
+    * @return   An array of ClusterNode listing the current members of the partitionn.
+    *           This array will be in the same order in all nodes in the cluster that
+    *           have received the current view.
     */
    public ClusterNode[] getClusterNodes ();
 

Copied: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java (from rev 58500, branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java)

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -7,6 +7,7 @@
 
 package org.jboss.ha.framework.server;
 
+import java.util.Set;
 import java.util.Vector;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,14 +27,13 @@
 
 import org.jboss.logging.Logger;
 
+import org.jboss.ha.framework.interfaces.ClusterNode;
 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
 import org.jboss.ha.framework.interfaces.HAPartition;
 
 
 /** 
- *   This class manages replicated objects.
- *
- * @todo there is still too much synrhonization on keyListeners
+ * This class manages replicated objects.
  * 
  * @author  <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
  * @author  <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
@@ -43,7 +43,8 @@
 public class DistributedReplicantManagerImpl
    implements DistributedReplicantManagerImplMBean,
               HAPartition.HAMembershipExtendedListener,
-              HAPartition.HAPartitionStateTransfer
+              HAPartition.HAPartitionStateTransfer,
+              AsynchEventHandler.AsynchEventProcessor
 {
    // Constants -----------------------------------------------------
    
@@ -56,7 +57,9 @@
    protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
    protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
    protected HashMap intraviewIdCache = new HashMap();
-   protected HAPartition partition;   
+   protected HAPartition partition; 
+   /** The handler used to send replicant change notifications asynchronously */
+   protected AsynchEventHandler asynchHandler;  
    
    protected Logger log;
    
@@ -113,6 +116,10 @@
    public void start() throws Exception
    {
       this.nodeName = this.partition.getNodeName ();
+      
+      // Create the asynch listener handler thread
+      asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
+      asynchHandler.start();
 
       partitionNameKnown.release (); // partition name is now known!
       
@@ -122,8 +129,15 @@
    
    public void stop() throws Exception
    {
-      // we cleanly shutdown. This should be optimized.
-      //
+      // BES 200604 -- implication of NR's JBLCUSTER-38 change.  Moving to
+      // destroy allows restart of HAPartition while local registrations 
+      // survive -- stopping partition does not stop all registered services
+      // e.g. ejbs; if we maintain their registrations we can pass them to
+      // the cluster when we restart.  However, we are leaving all the remote
+      // replicants we have registered around, so they will still be included 
+      // as targets if anyone contacts our EJB while partition is stopped.
+      // Probably OK; if they aren't valid the client will find this out.
+      
 //    NR 200505 : [JBCLUSTER-38] move to destroy
 //      if (localReplicants != null)
 //      {
@@ -136,14 +150,24 @@
 //         }
 //      }
       
+      // Stop the asynch handler thread
+      try
+      {
+         asynchHandler.stop();
+      }
+      catch( Exception e)
+      {
+         log.warn("Failed to stop asynchHandler", e);
+      }
+      
 //    NR 200505 : [JBCLUSTER-38] move to destroy
 //      this.mbeanserver.unregisterMBean (this.jmxName);
    }
 
    // NR 200505 : [JBCLUSTER-38] unbind at destroy
    public void destroy() throws Exception
-   {      
-      // we cleanly shutdown. This should be optimized.
+   {  
+      // now partition can't be resuscitated, so remove local replicants
       if (localReplicants != null)
       {
          synchronized(localReplicants)
@@ -152,11 +176,17 @@
             localReplicants.keySet().toArray(keys);
             for(int n = 0; n < keys.length; n ++)
             {               
-               this.remove (keys[n]);
+               this.removeLocal(keys[n]); // channel is disconnected, so
+                                          // don't try to notify cluster
             }
          }
       }
+
       this.mbeanserver.unregisterMBean (this.jmxName);
+      
+      partition.unregisterRPCHandler(SERVICE_NAME, this);
+      partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
+      partition.unregisterMembershipListener(this);
    }
 
    public String listContent () throws Exception
@@ -334,6 +364,20 @@
       // we don't need to merge members anymore
    }
    
+   // AsynchEventHandler.AsynchEventProcessor implementation -----------------
+   
+   public void processEvent(Object event)
+   {
+      KeyChangeEvent kce = (KeyChangeEvent) event;
+      notifyKeyListeners(kce.key, kce.replicants);
+   }
+   
+   static class KeyChangeEvent
+   {
+      String key;
+      List replicants;
+   }
+   
    // DistributedReplicantManager implementation ----------------------------------------------              
    
    public void add(String key, Serializable replicant) throws Exception
@@ -352,27 +396,31 @@
    }
    
    public void remove(String key) throws Exception
-   {
+   {      
       partitionNameKnown.acquire (); // we don't propagate until our name is known
       
-      if (localReplicants.containsKey (key)) 
-         // optimisation: we don't make a costly network call if there is
-         // nothing to remove
-         //
+      // optimisation: we don't make a costly network call
+      // if there is nothing to remove
+      if (localReplicants.containsKey(key))
       {
          Object[] args = {key, this.nodeName};
          partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
-         synchronized(localReplicants)
-         {
-            localReplicants.remove(key);
-            List result = lookupReplicants(key);
-            if (result == null)
-               result = new ArrayList (); // don't return null but an empty list
-            notifyKeyListeners(key, result);
-         }
+         removeLocal(key);
       }
    }
    
+   protected void removeLocal(String key)
+   {
+      synchronized(localReplicants)
+      {
+         localReplicants.remove(key);
+         List result = lookupReplicants(key);
+         if (result == null)
+            result = new ArrayList (); // don't pass null but an empty list
+         notifyKeyListeners(key, result);
+      }
+   }
+   
    public Serializable lookupLocalReplicant(String key)
    {
       return (Serializable)localReplicants.get(key);
@@ -386,10 +434,33 @@
          return null;
 
       ArrayList rtn = new ArrayList();
-      if (local != null)
-         rtn.add(local);
-      if (replicant != null)
-         rtn.addAll(replicant.values());
+
+      if (replicant == null)
+      {
+         if (local != null)
+            rtn.add(local);
+      }
+      else 
+      {
+         // JBAS-2677. Put the replicants in view order.
+         ClusterNode[] nodes = partition.getClusterNodes();
+         String replNode;
+         Object replVal;
+         for (int i = 0; i < nodes.length; i++)
+         {
+            replNode = nodes[i].getName();
+            if (local != null && nodeName.equals(replNode))
+            {
+               rtn.add(local);
+               continue;
+            }
+            
+            replVal = replicant.get(replNode);
+            if (replVal != null)
+               rtn.add(replVal);            
+         }
+      }
+      
       return rtn;
    }
    
@@ -401,10 +472,32 @@
          return null;
 
       ArrayList rtn = new ArrayList();
-      if (locallyReplicated)
-         rtn.add(this.nodeName);
-      if (replicant != null)
-         rtn.addAll(replicant.keySet ());
+      
+      if (replicant == null)
+      {   
+         if (locallyReplicated)
+            rtn.add(this.nodeName);
+      }
+      else
+      {
+         // JBAS-2677. Put the replicants in view order.
+         Set keys = replicant.keySet();
+         ClusterNode[] nodes = partition.getClusterNodes();
+         String keyOwner;
+         for (int i = 0; i < nodes.length; i++)
+         {
+            keyOwner = nodes[i].getName();
+            if (locallyReplicated && nodeName.equals(keyOwner))
+            {
+               rtn.add(this.nodeName);
+               continue;
+            }
+            
+            if (keys.contains(keyOwner))
+               rtn.add(keyOwner);            
+         }
+      }
+      
       return rtn;
    }
    
@@ -494,7 +587,7 @@
    // DistributedReplicantManager cluster callbacks ----------------------------------------------              
    
    /**
-    * cluster callback called when a new replicant is added on another node
+    * Cluster callback called when a new replicant is added on another node
     * @param key Replicant key
     * @param nodeName Node that add the current replicant
     * @param replicant Serialized representation of the replicant
@@ -507,7 +600,11 @@
       try
       {
          addReplicant(key, nodeName, replicant);
-         notifyKeyListeners(key, lookupReplicants(key));
+         // Notify listeners asynchronously
+         KeyChangeEvent kce = new KeyChangeEvent();
+         kce.key = key;
+         kce.replicants = lookupReplicants(key);
+         asynchHandler.queueEvent(kce);
       }
       catch (Exception ex)
       {
@@ -516,7 +613,7 @@
    }
    
    /**
-    * cluster callback called when a replicant is removed by another node
+    * Cluster callback called when a replicant is removed by another node
     * @param key Name of the replicant key
     * @param nodeName Node that wants to remove its replicant for the give key
     */   
@@ -524,8 +621,13 @@
    {
       try
       {
-         if (removeReplicant (key, nodeName))
-            notifyKeyListeners(key, lookupReplicants(key));
+         if (removeReplicant (key, nodeName)) {
+            // Notify listeners asynchronously
+            KeyChangeEvent kce = new KeyChangeEvent();
+            kce.key = key;
+            kce.replicants = lookupReplicants(key);
+            asynchHandler.queueEvent(kce);
+         }
       }
       catch (Exception ex)
       {
@@ -694,33 +796,40 @@
     * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
     * @param key The replicant key name
     * @param newReplicants The new list of replicants
+    * 
     */   
    protected void notifyKeyListeners(String key, List newReplicants)
    {
       if( trace )
          log.trace("notifyKeyListeners");
-      synchronized(keyListeners)
+
+      // we first update the intra-view id for this particular key
+      //
+      int newId = updateReplicantsHashId (key);
+      
+      ArrayList listeners = (ArrayList)keyListeners.get(key);
+      if (listeners == null)
       {
-         // we first update the intra-view id for this particular key
-         //
-         int newId = updateReplicantsHashId (key);
-         
-         ArrayList listeners = (ArrayList)keyListeners.get(key);
-         if (listeners == null)
-         {
-            if( trace )
-               log.trace("listeners is null");
-            return;
-         }
-         
          if( trace )
-           log.trace("notifying " + listeners.size() + " listeners for key change: " + key);
-         for (int i = 0; i < listeners.size(); i++)
-         {
-            DistributedReplicantManager.ReplicantListener listener = (DistributedReplicantManager.ReplicantListener)listeners.get(i);
-            listener.replicantsChanged(key, newReplicants, newId);
-         }
+            log.trace("listeners is null");
+         return;
       }
+      
+      // ArrayList's iterator is not thread safe
+      DistributedReplicantManager.ReplicantListener[] toNotify = null;
+      synchronized(listeners) 
+      {
+         toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
+         toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+      }
+      
+      if( trace )
+         log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
+      for (int i = 0; i < toNotify.length; i++)
+      {
+         if (toNotify[i] != null)
+            toNotify[i].replicantsChanged(key, newReplicants, newId);
+      }
    }
 
    protected void republishLocalReplicants()
@@ -742,14 +851,16 @@
             Map.Entry entry = (Map.Entry) entries.next();
             String key = (String) entry.getKey();
             Object replicant = entry.getValue();
+            if (replicant != null)
+            {
+               if( trace )
+                  log.trace("publishing, key=" + key + ", value=" + replicant);
 
-            if( trace )
-               log.trace("publishing, key=" + key + ", value=" + replicant);
+               Object[] args = {key, this.nodeName, replicant};
 
-            Object[] args = {key, this.nodeName, replicant};
-
-            partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
-            notifyKeyListeners(key, lookupReplicants(key));
+               partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+               notifyKeyListeners(key, lookupReplicants(key));
+            }
          }
          if( trace )
             log.trace("End Re-Publish local replicants");
@@ -781,12 +892,19 @@
          }
          for (int i = 0; i < rsp.size(); i++)
          {
-            Object[] objs = (Object[])rsp.get(i);
-            if (objs == null)
+            Object o = rsp.get(i);
+            if (o == null)
             {
                log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
                continue;
             }
+            else if (o instanceof Throwable)
+            {
+               log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+               continue;
+            }
+            
+            Object[] objs = (Object[]) o;
             String node = (String)objs[0];
             Map replicants = (Map)objs[1];
             Iterator keys = replicants.keySet().iterator();

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -25,8 +25,6 @@
 import javax.naming.StringRefAddr;
 import javax.management.MBeanServer;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
 import org.jgroups.JChannel;
 import org.jgroups.MergeView;
 import org.jgroups.View;
@@ -36,15 +34,12 @@
 import org.jgroups.stack.IpAddress;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
 
 import org.jboss.invocation.MarshalledValueInputStream;
 import org.jboss.invocation.MarshalledValueOutputStream;
 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
 import org.jboss.ha.framework.interfaces.DistributedState;
 import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
 import org.jboss.ha.framework.interfaces.ClusterNode;
 
 import org.jboss.naming.NonSerializableFactory;
@@ -63,7 +58,7 @@
 public class HAPartitionImpl
    extends org.jgroups.blocks.RpcDispatcher
    implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
-      HAPartition
+      HAPartition, AsynchEventHandler.AsynchEventProcessor
 {
    private static class NoHandlerForRPC implements Serializable
    {
@@ -78,14 +73,14 @@
 
    protected HashMap rpcHandlers = new HashMap();
    protected HashMap stateHandlers = new HashMap();
-   /** The HAMembershipListener and HAMembershipExtendedListeners */
-   protected ArrayList listeners = new ArrayList();
+   /** Do we send any membership change notifications synchronously? */
+   protected boolean allowSyncListeners = false;
+   /** The synch HAMembershipListener and HAMembershipExtendedListeners */
+   protected ArrayList synchListeners = new ArrayList();
    /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
    protected ArrayList asynchListeners = new ArrayList();
-   /** The LinkedQueue<ViewChangeEvent> of changes to notify asynch listeners of */
-   protected LinkedQueue asynchViewChanges = new LinkedQueue();
-   /** The Thread used to send membership change notifications asynchronously */
-   protected Thread asynchNotifyThread;
+   /** The handler used to send membership change notifications asynchronously */
+   protected AsynchEventHandler asynchHandler;
    /** The current cluster partition members */
    protected Vector members = null;
    protected Vector jgmembers = null;
@@ -215,6 +210,8 @@
       this.dsManager.init();
       log.debug("bind distributed state service");
 
+      // Create the asynchronous handler for view changes
+      asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
 
       // Bind ourself in the public JNDI space
       //
@@ -271,10 +268,8 @@
       this.replicantManager.start();
       this.dsManager.start();
 
-      // Create the asynch listener handler thread
-      AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
-      asynchNotifyThread = new Thread(asynchHandler, "AsynchHAMembershipListener Thread");
-      asynchNotifyThread.start();
+      // Start the asynch listener handler thread
+      asynchHandler.start();
    }
 
    public void closePartition() throws Exception
@@ -284,11 +279,11 @@
 
       try
       {
-         asynchNotifyThread.interrupt();
+         asynchHandler.stop();
       }
       catch( Exception e)
       {
-         log.warn("Failed to interrupte asynchNotifyThread", e);
+         log.warn("Failed to stop asynchHandler", e);
       }
 
       // Stop the DRM and DS services
@@ -556,11 +551,14 @@
          log.debug("membership changed from " + this.members.size() + " to "
             + event.allMembers.size());
          // Put the view change to the asynch queue
-         this.asynchViewChanges.put(event);
+         this.asynchHandler.queueEvent(event);
 
          // Broadcast the new view to the synchronous view change listeners
-         this.notifyListeners(listeners, event.viewId, event.allMembers,
-            event.deadMembers, event.newMembers, event.originatingGroups);
+         if (this.allowSyncListeners)
+         {
+            this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+                  event.deadMembers, event.newMembers, event.originatingGroups);
+         }
       }
       catch (Exception ex)
       {
@@ -899,20 +897,48 @@
    //   
    public void registerMembershipListener(HAMembershipListener listener)
    {
-      synchronized(this.listeners)
-      {
-         this.listeners.add(listener);
+      boolean isAsynch = (this.allowSyncListeners == false) 
+            || (listener instanceof AsynchHAMembershipListener)
+            || (listener instanceof AsynchHAMembershipExtendedListener);
+      if( isAsynch ) {
+         synchronized(this.asynchListeners) {
+            this.asynchListeners.add(listener);
+         }
       }
+      else  { 
+         synchronized(this.synchListeners) {
+            this.synchListeners.add(listener);
+         }
+      }
    }
    
    public void unregisterMembershipListener(HAMembershipListener listener)
    {
-      synchronized(this.listeners)
-      {
-         this.listeners.remove(listener);
+      boolean isAsynch = (this.allowSyncListeners == false) 
+            || (listener instanceof AsynchHAMembershipListener)
+            || (listener instanceof AsynchHAMembershipExtendedListener);
+      if( isAsynch ) {
+         synchronized(this.asynchListeners) {
+            this.asynchListeners.remove(listener);
+         }
       }
+      else  { 
+         synchronized(this.synchListeners) {
+            this.synchListeners.remove(listener);
+         }
+      }
    }
    
+   public boolean getAllowSynchronousMembershipNotifications()
+   {
+      return allowSyncListeners;
+   }
+
+   public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+   {      
+      this.allowSyncListeners = allowSync;
+   }
+   
    // org.jgroups.RpcDispatcher overrides ---------------------------------------------------
    
    /**
@@ -998,10 +1024,21 @@
             log.trace("rpc call threw exception", t);
          retval = t;
       }
-
+      
       return retval;
    }
+   
+   // AsynchEventHandler.AsynchEventProcessor -----------------------
 
+   public void processEvent(Object event)
+   {
+      ViewChangeEvent vce = (ViewChangeEvent) event;
+      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+            vce.deadMembers, vce.newMembers, vce.originatingGroups);
+      
+   }
+   
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -1101,31 +1138,35 @@
       Vector originatingGroups)
    {
       log.debug("Begin notifyListeners, viewID: "+viewID);
+      
       synchronized(theListeners)
       {
-         for (int i = 0; i < theListeners.size(); i++)
+         // JBAS-3619 -- don't hold synch lock while notifying
+         theListeners = (ArrayList) theListeners.clone();
+      }
+      
+      for (int i = 0; i < theListeners.size(); i++)
+      {
+         HAMembershipListener aListener = null;
+         try
          {
-            HAMembershipListener aListener = null;
-            try
+            aListener = (HAMembershipListener) theListeners.get(i);
+            if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
             {
-               aListener = (HAMembershipListener) theListeners.get(i);
-               if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
-               {
-                  HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
-                  exListener.membershipChangedDuringMerge (deadMembers, newMembers,
-                     allMembers, originatingGroups);
-               }
-               else
-               {
-                  aListener.membershipChanged(deadMembers, newMembers, allMembers);
-               }
+               HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
+               exListener.membershipChangedDuringMerge (deadMembers, newMembers,
+                  allMembers, originatingGroups);
             }
-            catch (Throwable e)
+            else
             {
-               // a problem in a listener should not prevent other members to receive the new view
-               log.warn("HAMembershipListener callback failure: "+aListener, e);
+               aListener.membershipChanged(deadMembers, newMembers, allMembers);
             }
          }
+         catch (Throwable e)
+         {
+            // a problem in a listener should not prevent other members to receive the new view
+            log.warn("HAMembershipListener callback failure: "+aListener, e);
+         }
       }
       log.debug("End notifyListeners, viewID: "+viewID);
    }
@@ -1154,7 +1195,7 @@
       catch (Exception ignored){}
    }
 
-   /** A simply data class containing the view change event needed to
+   /** A simple data class containing the view change event needed to
     * message the HAMembershipListeners
     */
    private static class ViewChangeEvent
@@ -1166,31 +1207,6 @@
       Vector originatingGroups;
    }
 
-   /** The Runnable that handles the asynchronous listener notifications
-    */
-   private class AsynchViewChangeHandler implements Runnable
-   {
-      public void run()
-      {
-         log.debug("Begin AsynchViewChangeHandler");
-         while( true )
-         {
-            try
-            {
-               ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take();
-               notifyListeners(asynchListeners, event.viewId, event.allMembers,
-                  event.deadMembers, event.newMembers, event.originatingGroups);
-            }
-            catch(InterruptedException e)
-            {
-               log.debug("AsynchViewChangeHandler interrupted", e);
-               break;
-            }
-         }
-         log.debug("End AsynchViewChangeHandler");
-      }
-   }
-
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -187,6 +187,8 @@
       // register to listen to topology changes, which might cause the election of a new master
       drmListener = new DistributedReplicantManager.ReplicantListener()
       {
+         Object mutex = new Object();
+         
          public void replicantsChanged(
             String key,
             List newReplicants,
@@ -194,8 +196,16 @@
          {
             if (key.equals(getServiceHAName()))
             {
-               // change in the topology callback
-               HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+               // This synchronized block was added when the internal behavior of
+               // DistributedReplicantManagerImpl was changed so that concurrent
+               // replicantsChanged notifications are possible. Synchronization
+               // ensures that this change won't break non-thread-safe 
+               // subclasses of HAServiceMBeanSupport.
+               synchronized(mutex)
+               {
+                  // change in the topology callback
+                  HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+               }
             }
          }
       };

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -180,7 +180,11 @@
       }
    }
 
-   public void replicantsChanged (String key, List newReplicants, int newReplicantsViewId)
+   // synchronized keyword added when it became possible for DRM to issue
+   // concurrent replicantsChanged notifications. JBAS-2169.
+   public synchronized void replicantsChanged (String key, 
+                                               List newReplicants, 
+                                               int newReplicantsViewId)
    {
       try
       {

Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -158,7 +158,11 @@
       }
    }
 
-   public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId)
+   // synchronized keyword added when it became possible for DRM to issue
+   // concurrent replicantsChanged notifications. JBAS-2169.
+   public synchronized void replicantsChanged(String key, 
+                                              List newReplicants, 
+                                              int newReplicantsViewId)
    {
       try
       {

Copied: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java (from rev 58500, branches/JBoss_4_0_3_SP1_JBAS-3822/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java)

Modified: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -1,16 +1,38 @@
 /*
-* JBoss, the OpenSource J2EE webOS
-*
-* Distributable under LGPL license.
-* See terms of license at gnu.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.test.cluster.test;
 
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Vector;
 import java.util.List;
 import java.util.HashSet;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
 import javax.management.Notification;
@@ -19,14 +41,23 @@
 
 import org.jboss.test.JBossClusteredTestCase;
 import org.jboss.test.cluster.drm.IReplicants;
+import org.jboss.test.cluster.drm.MockHAPartition;
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener;
+import org.jboss.ha.framework.server.DistributedReplicantManagerImpl;
 import org.jboss.jmx.adaptor.rmi.RMIAdaptor;
 import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt;
 import org.jboss.jmx.adaptor.rmi.RMINotificationListener;
+import org.jgroups.stack.IpAddress;
 import org.apache.log4j.Logger;
 
-/** Tests of http session replication
+import EDU.oswego.cs.dl.util.concurrent.Semaphore;
+
+/** Tests of the DistributedReplicantManagerImpl
  *
  * @author  Scott.Stark at jboss.org
+ * @author  Brian.Stansberry at jboss.com
  * @version $Revision$
  */
 public class DRMTestCase extends JBossClusteredTestCase
@@ -45,7 +76,419 @@
          log.info("handleNotification, "+notification);
       }
    }
+   
+   /**
+    * Thread that will first register a DRM ReplicantLister that synchronizes
+    * on the test class' lock object, and then calls DRM add or remove,
+    * causing the thread to block if the lock object's monitor is held.
+    */
+   static class BlockingListenerThread extends Thread 
+      implements DistributedReplicantManager.ReplicantListener
+   {
+      private DistributedReplicantManagerImpl drm;
+      private String nodeName;
+      private boolean add;
+      private boolean blocking;
+      private Exception ex;
+      
+      BlockingListenerThread(DistributedReplicantManagerImpl drm,
+                             boolean add,
+                             String nodeName)
+      {
+         this.drm = drm;
+         this.add =add;
+         this.nodeName = nodeName;
+         drm.registerListener("TEST", this);         
+      }
 
+      public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId)
+      {
+         blocking = true;
+         synchronized(lock)
+         {
+            blocking = false;
+         }
+      }
+      
+      public void run()
+      {
+         try
+         {
+            if (add)
+            {
+               if (nodeName == null)
+                  drm.add("TEST", "local-replicant");
+               else
+                  drm._add("TEST", nodeName, "remote-replicant");
+            }
+            else 
+            {
+               if (nodeName == null)
+                  drm.remove("TEST");
+               else
+                  drm._remove("TEST", nodeName);
+            }
+         }
+         catch (Exception e)
+         {
+            ex = e;
+         }
+      }
+      
+      public boolean isBlocking()
+      {
+         return blocking;
+      }
+      
+      public Exception getException()
+      {
+         return ex;
+      }
+      
+   }
+   
+   /**
+    * Thread that registers and then unregisters a DRM ReplicantListener.
+    */
+   static class RegistrationThread extends Thread
+   {
+      private DistributedReplicantManager drm;
+      private boolean registered = false;
+      private boolean unregistered = true;
+      
+      RegistrationThread(DistributedReplicantManager drm)
+      {
+         this.drm = drm;
+      }
+      
+      public void run()
+      {
+         NullListener listener = new NullListener();
+         drm.registerListener("DEADLOCK", listener);
+         registered = true;
+         drm.unregisterListener("DEADLOCK", listener);
+         unregistered = true;
+      }
+      
+      public boolean isRegistered()
+      {
+         return registered;
+      }
+      
+      public boolean isUnregistered()
+      {
+         return unregistered;
+      }
+      
+   }
+   
+   /**
+    * A DRM ReplicantListener that does nothing.
+    */
+   static class NullListener
+      implements DistributedReplicantManager.ReplicantListener
+   {
+      public void replicantsChanged(String key, List newReplicants, 
+                                    int newReplicantsViewId)
+      {
+         // no-op
+      }
+   }
+   
+   /**
+    * DRM ReplicantListener that mimics the HASingletonDeployer service
+    * by deploying/undeploying a service if it's notified that by that DRM
+    * that it is the master replica for its key.
+    */
+   static class MockHASingletonDeployer
+      implements DistributedReplicantManager.ReplicantListener
+   {
+      DistributedReplicantManager drm;
+      MockDeployer deployer;
+      String key;
+      boolean master = false;
+      NullListener deploymentListener = new NullListener();
+      Exception ex;
+      Logger log;
+      Object mutex = new Object();
+      
+      MockHASingletonDeployer(MockDeployer deployer, String key, Logger log)
+      {
+         this.drm = deployer.getDRM();
+         this.deployer = deployer;
+         this.key = key;
+         this.log = log;
+      }
+
+      public void replicantsChanged(String key, 
+                                    List newReplicants, 
+                                    int newReplicantsViewId)
+      {
+         if (this.key.equals(key))
+         {
+            synchronized(mutex)
+            {
+               boolean nowMaster = drm.isMasterReplica(key);
+               
+               try
+               {
+                  if (!master && nowMaster) {
+                     log.debug(Thread.currentThread().getName() + 
+                               " Deploying " + key);
+                     deployer.deploy(key + "A", key, deploymentListener);
+                  }
+                  else if (master && !nowMaster) {
+                     log.debug(Thread.currentThread().getName() + 
+                               " undeploying " + key);
+                     deployer.undeploy(key + "A", deploymentListener);
+                  }
+                  else 
+                  {
+                     log.debug(Thread.currentThread().getName() + 
+                               " -- no status change in " + key + 
+                               " -- master = " + master);   
+                  }
+                  master = nowMaster;
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  if (ex == null)
+                     ex = e;
+               }
+            }
+         }         
+      }
+      
+      public Exception getException()
+      {
+         return ex;
+      }
+      
+   }
+   
+   /**
+    * Thread the repeatedly deploys and undeploys a MockHASingletonDeployer.
+    */
+   static class DeployerThread extends Thread
+   {
+      Semaphore semaphore;
+      MockDeployer deployer;
+      DistributedReplicantManager.ReplicantListener listener;
+      String key;
+      Exception ex;
+      int count = -1;
+      Logger log;
+      
+      DeployerThread(MockDeployer deployer, 
+                     String key, 
+                     DistributedReplicantManager.ReplicantListener listener,
+                     Semaphore semaphore,
+                     Logger log)
+      {
+         super("Deployer " + key);
+         this.deployer = deployer;
+         this.listener = listener;
+         this.key = key;
+         this.semaphore = semaphore;
+         this.log = log;
+      }
+      
+      public void run()
+      {
+         boolean acquired = false;
+         try
+         {
+            acquired = semaphore.attempt(60000);
+            if (!acquired)
+               throw new Exception("Cannot acquire semaphore");
+            SecureRandom random = new SecureRandom();
+            for (count = 0; count < LOOP_COUNT; count++)
+            {
+               deployer.deploy(key, "JGroups", listener);
+
+               sleepThread(random.nextInt(50));
+               deployer.undeploy(key, listener);
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            ex = e;
+         }
+         finally
+         {
+            if (acquired)
+               semaphore.release();
+         }
+      }
+      
+      public Exception getException()
+      {
+         return ex;
+      }
+      
+      public int getCount()
+      {
+         return count;
+      }
+   }
+   
+   /**
+    * Thread that mimics the JGroups up-handler thread that calls into the DRM.
+    * Repeatedly and randomly calls adds or removes a replicant for a set
+    * of keys. 
+    */
+   static class JGroupsThread extends Thread
+   {
+      Semaphore semaphore;
+      DistributedReplicantManagerImpl drm;
+      String[] keys;
+      String nodeName;
+      Exception ex;
+      int count = -1;
+      int weightFactor;
+      
+      JGroupsThread(DistributedReplicantManagerImpl drm, 
+                    String[] keys,
+                    String nodeName,
+                    Semaphore semaphore)
+      {
+         super("JGroups");
+         this.drm = drm;
+         this.keys = keys;
+         this.semaphore = semaphore;
+         this.nodeName = nodeName;
+         this.weightFactor = (int) 2.5 * keys.length;
+      }
+      
+      public void run()
+      {
+         boolean acquired = false;
+         try
+         {
+            acquired = semaphore.attempt(60000);
+            if (!acquired)
+               throw new Exception("Cannot acquire semaphore");
+            boolean[] added = new boolean[keys.length];
+            SecureRandom random = new SecureRandom();
+            
+            for (count = 0; count < weightFactor * LOOP_COUNT; count++)
+            {
+               int pos = random.nextInt(keys.length);
+               if (added[pos])
+               {
+                  drm._remove(keys[pos], nodeName);
+                  added[pos] = false;
+               }
+               else
+               {
+                  drm._add(keys[pos], nodeName, "");
+                  added[pos] = true;
+               }
+               sleepThread(random.nextInt(30));
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            ex = e;
+         }
+         finally
+         {
+            if (acquired)
+               semaphore.release();
+         }
+      }
+      
+      public Exception getException()
+      {
+         return ex;
+      }
+      
+      public int getCount()
+      {
+         return (count / weightFactor);
+      }
+      
+   }
+   
+   /**
+    * Mocks the deployer of a service that registers/unregisters DRM listeners 
+    * and replicants. Only allows a single thread of execution, a la the
+    * org.jboss.system.ServiceController.
+    */
+   static class MockDeployer
+   {
+      DistributedReplicantManager drm;
+      
+      MockDeployer(DistributedReplicantManager drm)
+      {
+         this.drm = drm;
+      }
+      
+      void deploy(String key, String replicant, 
+                  DistributedReplicantManager.ReplicantListener listener)
+            throws Exception 
+      {
+         synchronized(this)
+         {
+            drm.registerListener(key, listener);
+            drm.add(key, replicant);
+            sleepThread(10);
+         }
+      }
+      
+      void undeploy(String key, 
+                    DistributedReplicantManager.ReplicantListener listener)
+         throws Exception 
+      {
+         synchronized(this)
+         {
+            drm.remove(key);
+            drm.unregisterListener(key, listener);
+            sleepThread(10);
+         }
+      }
+      
+      DistributedReplicantManager getDRM()
+      {
+         return drm;
+      }
+   }
+
+   /** ReplicantListener that caches the list of replicants */
+   static class CachingListener implements ReplicantListener
+   {
+      List replicants = null;
+      boolean clean = true;
+      
+      public void replicantsChanged(String key, List newReplicants, 
+                                    int newReplicantsViewId)
+      {
+         this.replicants = newReplicants;
+         if (clean && newReplicants != null)
+         {
+            int last = Integer.MIN_VALUE;
+            for (Iterator iter = newReplicants.iterator(); iter.hasNext(); )
+            {
+               int cur = ((Integer) iter.next()).intValue();
+               if (last >= cur)
+               {
+                  clean = false;
+                  break;
+               }
+               
+               last = cur;
+            }
+         }
+      }
+      
+   }
+
+   private static Object lock = new Object();
+   private static int LOOP_COUNT = 30;
+   
    public static Test suite() throws Exception
    {
       Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar");
@@ -137,5 +580,660 @@
       server0.removeNotificationListener(drmService, listener);
       server1.removeNotificationListener(drmService, listener);
    }
+   
+   /**
+    * Tests the functionality of isMasterReplica(), also testing merge
+    * handling.
+    * 
+    * TODO move this test out of the testsuite and into the cluster module
+    *      itself, since it doesn't rely on the container.
+    * 
+    * @throws Exception
+    */
+   public void testIsMasterReplica() throws Exception
+   {
+      log.debug("+++ testIsMasterReplica()");
+      
+      MBeanServer mbeanServer = 
+         MBeanServerFactory.createMBeanServer("mockPartition");
+      try {
+         ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+         MockHAPartition partition = new MockHAPartition(localAddress);
+      
+         DistributedReplicantManagerImpl drm = 
+               new DistributedReplicantManagerImpl(partition, mbeanServer);
 
+         drm.init();
+         
+         // Create a fake view for the MockHAPartition
+         
+         Vector remoteAddresses = new Vector();
+         for (int i = 1; i < 5; i++)
+            remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+         
+         Vector allNodes = new Vector(remoteAddresses);
+         allNodes.add(localAddress);
+         partition.setCurrentViewClusterNodes(allNodes);
+         
+         // Pass fake state to DRMImpl
+      
+         HashMap replicants = new HashMap();
+         ArrayList remoteResponses = new ArrayList();
+         for (int i = 0; i < remoteAddresses.size(); i++)
+         {
+            ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i);
+            Integer replicant = new Integer(i + 1);
+            replicants.put(node.getName(), replicant);
+            HashMap localReplicant = new HashMap();
+            localReplicant.put("Mock", replicant);
+            remoteResponses.add(new Object[] {node.getName(), localReplicant});
+         }
+         HashMap services = new HashMap();
+         services.put("Mock", replicants);
+         
+         int hash = 0;
+         for (int i = 1; i < 5; i++)
+            hash += (new Integer(i)).hashCode();
+         
+         HashMap intraviewIds = new HashMap();
+         intraviewIds.put("Mock", new Integer(hash));
+      
+         partition.setRemoteReplicants(remoteResponses);
+         
+         drm.setCurrentState(new Object[] {services, intraviewIds });
+         
+         drm.start();
+         
+         // add a local replicant
+         
+         drm.add("Mock", new Integer(5));
+         
+         // test that this node is not the master replica
+         
+         assertFalse("Local node is not master after startup", 
+                     drm.isMasterReplica("Mock")); 
+      
+         // simulate a split where this node is the coord
+         
+         Vector localOnly = new Vector();
+         localOnly.add(localAddress);
+         
+         partition.setCurrentViewClusterNodes(localOnly);
+         partition.setRemoteReplicants(new ArrayList());
+         
+         drm.membershipChanged(remoteAddresses, new Vector(), localOnly);
+         
+         // test that this node is the master replica
+         
+         assertTrue("Local node is master after split", drm.isMasterReplica("Mock"));
+         
+         // Remove our local replicant
+         
+         drm.remove("Mock");
+         
+         // test that this node is not the master replica
+         
+         assertFalse("Local node is not master after dropping replicant", 
+                     drm.isMasterReplica("Mock"));
+         
+         // Restore the local replicant 
+         
+         drm.add("Mock", new Integer(5));
+         
+         // simulate a merge
+         
+         Vector mergeGroups = new Vector();
+         mergeGroups.add(remoteAddresses);
+         mergeGroups.add(localOnly);
+         
+         partition.setCurrentViewClusterNodes(allNodes);
+         partition.setRemoteReplicants(remoteResponses);
+         
+         drm.membershipChangedDuringMerge(new Vector(), remoteAddresses, 
+                                          allNodes, mergeGroups);         
+         
+         // Merge processing is done asynchronously, so pause a bit
+         sleepThread(100);
+         
+         // test that this node is not the master replica
+         
+         assertFalse("Local node is not master after merge", 
+                     drm.isMasterReplica("Mock")); 
+      }
+      finally {
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      }
+   }
+   
+   
+   /**
+    * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+    * prevent other threads registering/unregistering listeners. JBAS-2539
+    * 
+    * TODO move this test out of the testsuite and into the cluster module
+    *      itself, since it doesn't rely on the container.
+    * 
+    * @throws Exception
+    */
+   public void testKeyListenerDeadlock() throws Exception
+   {
+      log.debug("+++ testKeyListenerDeadlock()");
+      
+      MBeanServer mbeanServer = 
+         MBeanServerFactory.createMBeanServer("mockPartition");
+      try {
+         ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+         MockHAPartition partition = new MockHAPartition(localAddress);
+      
+         DistributedReplicantManagerImpl drm = 
+               new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+         drm.init();
+         
+         // Create a fake view for the MockHAPartition
+         
+         Vector remoteAddresses = new Vector();
+         for (int i = 1; i < 5; i++)
+            remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+         
+         Vector allNodes = new Vector(remoteAddresses);
+         allNodes.add(localAddress);
+         partition.setCurrentViewClusterNodes(allNodes);
+         
+         drm.start();
+         
+         BlockingListenerThread blt = 
+            new BlockingListenerThread(drm, true, null);
+         
+         // Hold the lock monitor so the test thread can't acquire it
+         // This keeps the blocking thread alive.
+         synchronized(lock) {
+            // Spawn a thread that will change a key and then block on the
+            // notification back to itself
+            blt.start();
+
+            sleepThread(50);
+            
+            assertTrue("Test thread is alive", blt.isAlive());            
+            assertTrue("Test thread is blocking", blt.isBlocking());
+            
+            RegistrationThread rt = new RegistrationThread(drm);
+            rt.start();
+
+            sleepThread(50);
+            
+            assertTrue("No deadlock on listener registration", rt.isRegistered());
+            
+            assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
+            
+            assertNull("No exception in deadlock tester", blt.getException());
+            
+            assertTrue("Test thread is still blocking", blt.isBlocking());
+            assertTrue("Test thread is still alive", blt.isAlive());
+         }
+         
+         drm.unregisterListener("TEST", blt);
+         
+         sleepThread(50);
+         
+         // Test going through remove
+         blt = new BlockingListenerThread(drm, false, null);
+         
+         // Hold the lock monitor so the test thread can't acquire it
+         // This keeps the blocking thread alive.
+         synchronized(lock) {
+            // Spawn a thread that will change a key and then block on the
+            // notification back to itself
+            blt.start();
+
+            sleepThread(50);
+            
+            assertTrue("Test thread is alive", blt.isAlive());            
+            assertTrue("Test thread is blocking", blt.isBlocking());
+            
+            RegistrationThread rt = new RegistrationThread(drm);
+            rt.start();
+
+            sleepThread(50);
+            
+            assertTrue("No deadlock on listener registration", rt.isRegistered());
+            
+            assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
+            
+            assertNull("No exception in deadlock tester", blt.getException());
+            
+            assertTrue("Test thread is still blocking", blt.isBlocking());
+            assertTrue("Test thread is still alive", blt.isAlive());
+         }
+      }
+      finally {
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      }
+   }
+   
+   
+   /**
+    * Tests that remotely-originated calls don't block.
+    * 
+    * TODO move this test out of the testsuite and into the cluster module
+    *      itself, since it doesn't rely on the container.
+    * 
+    * @throws Exception
+    */
+   public void testRemoteCallBlocking() throws Exception
+   {
+      log.debug("+++ testRemoteCallBlocking()");
+      
+      MBeanServer mbeanServer = 
+         MBeanServerFactory.createMBeanServer("mockPartition");
+      try {
+         ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+         MockHAPartition partition = new MockHAPartition(localAddress);
+      
+         DistributedReplicantManagerImpl drm = 
+               new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+         drm.init();
+         
+         // Create a fake view for the MockHAPartition
+         
+         Vector remoteAddresses = new Vector();
+         for (int i = 1; i < 5; i++)
+            remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+         
+         Vector allNodes = new Vector(remoteAddresses);
+         allNodes.add(localAddress);
+         partition.setCurrentViewClusterNodes(allNodes);
+         
+         drm.start();
+         
+         String sender = ((ClusterNode)remoteAddresses.get(0)).getName();
+         BlockingListenerThread blt = 
+            new BlockingListenerThread(drm, true, sender);
+         
+         // Hold the lock monitor so the test thread can't acquire it
+         // This keeps the blocking thread alive.
+         synchronized(lock) {
+            // Spawn a thread that will change a key and then block on the
+            // notification back to itself
+            blt.start();
+
+            sleepThread(50);
+            
+            assertFalse("JGroups thread is not alive", blt.isAlive());            
+            assertTrue("Async handler thread is blocking", blt.isBlocking());
+            
+            assertNull("No exception in JGroups thread", blt.getException());
+         }
+         
+         drm.unregisterListener("TEST", blt);
+         
+         sleepThread(50);
+         
+         // Test going through remove
+         blt = new BlockingListenerThread(drm, false, sender);
+         
+         // Hold the lock monitor so the test thread can't acquire it
+         // This keeps the blocking thread alive.
+         synchronized(lock) {
+            // Spawn a thread that will change a key and then block on the
+            // notification back to itself
+            blt.start();
+
+            sleepThread(50);
+            
+            assertFalse("JGroups thread is not alive", blt.isAlive());            
+            assertTrue("Async handler thread is blocking", blt.isBlocking());
+            
+            assertNull("No exception in JGroups thread", blt.getException());
+         }
+      }
+      finally {
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      }
+   }
+   
+   /**
+    * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+    * prevent other threads that use different keys adding/removing 
+    * replicants. JBAS-2169
+    * 
+    * TODO move this test out of the testsuite and into the cluster module
+    *      itself, since it doesn't rely on the container.
+    * 
+    * @throws Exception
+    */
+   public void testNonConflictingAddRemoveDeadlock() throws Exception
+   {
+
+      log.debug("+++ testNonConflictingAddRemoveDeadlock()");
+      
+      addRemoveDeadlockTest(false);
+   }
+   
+   /**
+    * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+    * prevent other threads that use the same keys adding/removing 
+    * replicants. JBAS-1151
+    * 
+    * NOTE: This test basically demonstrates a small race condition that can
+    * happen with the way HASingletonSupport's startService() method is
+    * implemented (actually HAServiceMBeanSupport, but relevant in the case
+    * of subclass HASingletonSupport, and in particular in its use in the
+    * HASingletonDeployer service).  However, since the test doesn't actually
+    * use the relevant code, but rather uses mock objects that work the same
+    * way, this test is disabled -- its purpose has been achieved.  JIRA issue 
+    * JBAS-1151 tracks the real problem; when it's resolved we'll create a test 
+    * case against the real code that proves that fact.
+    * 
+    * TODO move this test out of the testsuite and into the cluster module
+    *      itself, since it doesn't rely on the container.
+    * 
+    * @throws Exception
+    */
+   public void badtestConflictingAddRemoveDeadlock() throws Exception
+   {
+      log.debug("+++ testConflictingAddRemoveDeadlock()");
+      
+      addRemoveDeadlockTest(true);
+   }  
+   
+   private void addRemoveDeadlockTest(boolean conflicting) throws Exception
+   {  
+      String[] keys = { "A", "B", "C", "D", "E" };
+      int count = keys.length;
+      
+      MBeanServer mbeanServer = 
+         MBeanServerFactory.createMBeanServer("mockPartition");
+      try {
+         ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+         MockHAPartition partition = new MockHAPartition(localAddress);
+      
+         DistributedReplicantManagerImpl drm = 
+               new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+         drm.init();
+         
+         // Create a fake view for the MockHAPartition
+         
+         Vector remoteAddresses = new Vector();
+         ClusterNode remote = new ClusterNode(new IpAddress("127.0.0.1", 12341));
+         remoteAddresses.add(remote);
+         
+         Vector allNodes = new Vector(remoteAddresses);
+         allNodes.add(localAddress);
+         partition.setCurrentViewClusterNodes(allNodes);
+         
+         drm.start();
+         
+         MockDeployer deployer = new MockDeployer(drm);
+         
+         if (!conflicting)
+         {
+            // Register a MockHASingletonDeployer, but since we're in
+            // non-conflicting mode, the DeployerThreads won't deal with it
+            MockHASingletonDeployer listener = 
+                  new MockHASingletonDeployer(deployer, "HASingleton", log);
+            
+            drm.registerListener("HASingleton", listener);
+            drm.add("HASingleton", "HASingleton");            
+         }
+         
+         // Create a semaphore to gate the threads and acquire all its permits
+         Semaphore semaphore = new Semaphore(count + 1);
+         for (int i = 0; i <= count; i++)
+            semaphore.acquire();
+         
+         DeployerThread[] deployers = new DeployerThread[keys.length];
+         for (int i = 0; i < count; i++)
+         {
+            DistributedReplicantManager.ReplicantListener listener = null;
+            if (conflicting)
+            {
+               listener = new MockHASingletonDeployer(deployer, keys[i], log);
+            }
+            else
+            {
+               listener = new NullListener();
+            }
+            deployers[i] = new DeployerThread(deployer, keys[i], listener, semaphore, log);
+            deployers[i].start();
+         }
+         
+         String[] jgKeys = keys;
+         if (!conflicting)
+         {
+            // The JGroups thread also deals with the MockHASingletonDeployer
+            // key that the DeployerThreads don't
+            jgKeys = new String[keys.length + 1];
+            System.arraycopy(keys, 0, jgKeys, 0, keys.length);
+            jgKeys[keys.length] = "HASingleton";            
+         }
+         JGroupsThread jgThread = new JGroupsThread(drm, jgKeys, remote.getName(), semaphore);
+         jgThread.start();
+         
+         // Launch the threads
+         semaphore.release(count + 1);
+         
+         boolean reacquired = false;
+         try
+         {
+            // Give the threads 5 secs to acquire the semaphore
+            long maxElapsed = System.currentTimeMillis() + 5000;
+            for (int i = 0; i < keys.length; i++)
+            {
+               if (deployers[i].getCount() < 0)
+               {
+                  assertTrue("Thread " + keys[i] + " started in time",
+                              maxElapsed - System.currentTimeMillis() > 0);
+                  sleepThread(10);
+                  i--; // try again
+               }   
+            }
+            
+            while (jgThread.getCount() < 0)
+            {
+               assertTrue("jgThread started in time",
+                           maxElapsed - System.currentTimeMillis() > 0);
+               sleepThread(10);               
+            }
+            
+            // Reaquire all the permits, thus showing the threads didn't deadlock
+            
+            // Give them 500 ms per loop
+            maxElapsed = System.currentTimeMillis() + (500 * LOOP_COUNT);
+            for (int i = 0; i <= count; i++)
+            {
+               long waitTime = maxElapsed - System.currentTimeMillis();
+               assertTrue("Acquired thread " + i, semaphore.attempt(waitTime));
+            }
+            
+            reacquired = true;
+            
+            // Ensure there were no exceptions
+            for (int i = 0; i < keys.length; i++)
+            {
+               assertEquals("Thread " + keys[i] + " finished", LOOP_COUNT, deployers[i].getCount());
+               assertNull("Thread " + keys[i] + " saw no exceptions", deployers[i].getException());
+            }
+            assertEquals("JGroups Thread finished", LOOP_COUNT, jgThread.getCount());
+            assertNull("JGroups Thread saw no exceptions", jgThread.getException());
+         }
+         finally
+         {
+
+            if (!reacquired)
+            {
+               for (int i = 0; i < keys.length; i++)
+               {
+                  if (deployers[i].getException() != null)
+                  {
+                     System.out.println("Exception in deployer " + i);
+                     deployers[i].getException().printStackTrace(System.out);
+                  }
+                  else
+                  {
+                     System.out.println("Thread " + i + " completed " + deployers[i].getCount());
+                  }
+               }
+               if (jgThread.getException() != null)
+               {
+                  System.out.println("Exception in jgThread");
+                  jgThread.getException().printStackTrace(System.out);
+               }
+               else
+               {
+                  System.out.println("jgThread completed " + jgThread.getCount());
+               }
+            }
+            
+            // Be sure the threads are dead
+            if (jgThread.isAlive())
+            {
+               jgThread.interrupt();
+               sleepThread(5);   
+               printStackTrace(jgThread.getName(), jgThread.getException());
+            }
+            for (int i = 0; i < keys.length; i++)
+            {
+               if (deployers[i].isAlive())
+               {
+                  deployers[i].interrupt();
+                  sleepThread(5);
+                  printStackTrace(deployers[i].getName(), deployers[i].getException());
+               }
+            }
+               
+         }
+      }
+      finally {
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      }
+   }
+
+   public void testReplicantOrder() throws Exception
+   {
+      MBeanServer mbeanServer =
+         MBeanServerFactory.createMBeanServer("mockPartitionA");
+      try {
+         
+         //  Create a fake view for the MockHAPartition
+         ClusterNode[] nodes = new ClusterNode[5];
+         String[] names = new String[nodes.length];
+         Integer[] replicants = new Integer[nodes.length];
+         Vector allNodes = new Vector();
+         for (int i = 0; i < nodes.length; i++)
+         {
+            nodes[i] = new ClusterNode(new IpAddress("127.0.0.1", 12340 + i));
+            allNodes.add(nodes[i]);
+            names[i] = nodes[i].getName();
+            replicants[i] = new Integer(i);
+         }
+         
+         MockHAPartition partition = new MockHAPartition(nodes[2]);
+         partition.setCurrentViewClusterNodes(allNodes);
+         
+         DistributedReplicantManagerImpl drm = 
+               new DistributedReplicantManagerImpl(partition, mbeanServer);
+         drm.init();
+         drm.start();
+         
+         CachingListener listener = new CachingListener();
+         drm.registerListener("TEST", listener);
+         
+         SecureRandom random = new SecureRandom();
+         boolean[] added = new boolean[nodes.length];
+         List lookup = null;
+         for (int i = 0; i < 10; i++)
+         {
+            int node = random.nextInt(nodes.length);
+            if (added[node])
+            {
+               if (node == 2)
+                  drm.remove("TEST");
+               else
+                  drm._remove("TEST", nodes[node].getName());
+               added[node] = false;
+            }
+            else
+            {     
+               if (node == 2)
+                  drm.add("TEST", replicants[node]);
+               else
+                  drm._add("TEST", nodes[node].getName(), replicants[node]);   
+               added[node] = true;
+            }
+            
+            // Confirm the proper order of the replicant node names
+            lookup = maskListClass(drm.lookupReplicantsNodeNames("TEST"));
+            confirmReplicantList(lookup, names, added);
+            
+            // Confirm the proper order of the replicants via lookupReplicants
+            lookup = maskListClass(drm.lookupReplicants("TEST"));
+            confirmReplicantList(lookup, replicants, added);
+            
+            // Confirm the listener got the same list
+//            assertEquals("Listener received a correct list", lookup, 
+//                         maskListClass(listener.replicants));
+         }
+         
+         // Let the asynchronous notification thread catch up
+         sleep(25);
+         
+         // Confirm all lists presented to the listener were properly ordered
+         assertTrue("Listener saw no misordered lists", listener.clean);
+         
+      }
+      finally {
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      }
+   }
+   
+   private void confirmReplicantList(List current, Object[] all, boolean[] added)
+   {
+      Iterator iter = current.iterator();
+      for (int i = 0; i < added.length; i++)
+      {
+         if (added[i])
+         {
+            assertTrue("List has more replicants", iter.hasNext());
+            assertEquals("Replicant for node " + i + " is next", 
+                         all[i], iter.next());
+         }
+      }
+      assertFalse("List has no extra replicants", iter.hasNext());
+   }
+   
+   /** Converts the given list to an ArrayList, if it isn't already */
+   private List maskListClass(List toMask)
+   {
+      if (toMask instanceof ArrayList)
+         return toMask;
+      else if (toMask == null)
+         return new ArrayList();
+      else
+         return new ArrayList(toMask);
+   }
+
+   private static void sleepThread(long millis)
+   {
+      try
+      {
+         Thread.sleep(millis);
+      }
+      catch (InterruptedException e) {
+         e.printStackTrace();
+      }
+   }
+   
+   private static void printStackTrace(String threadName, Exception e)
+   {
+      if (e instanceof InterruptedException)
+      {
+         System.out.println("Stack trace for " + threadName);
+         e.printStackTrace(System.out);
+         System.out.println();
+      }
+   }
+
 }

Modified: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java	2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java	2006-11-17 19:13:31 UTC (rev 58501)
@@ -350,7 +350,10 @@
       RMIAdaptor[] adaptors = getAdaptors();      
       Long cacheCount = (Long) adaptors[0].getAttribute(oName, "CacheSize");
       assertEquals("CacheSize is zero", 0, cacheCount.longValue());
-      cacheCount = (Long) adaptors[0].getAttribute(oName, "PassivatedCount");
-      assertEquals("PassivatedCount is zero", 0, cacheCount.longValue());
+      // Checking the passivated count is invalid, as it doesn't get reduced
+      // when remove() is called on a bean -- only when the passivation cleanup
+      // thread runs
+      //cacheCount = (Long) adaptors[0].getAttribute(oName, "PassivatedCount");
+      //assertEquals("PassivatedCount is zero", 0, cacheCount.longValue());
    }
 }




More information about the jboss-cvs-commits mailing list