[jboss-cvs] JBossAS SVN: r58501 - in branches/JBoss_4_0_3_SP1_CP: cluster/src/main/org/jboss/cache/invalidation/bridges cluster/src/main/org/jboss/ha/framework/interfaces cluster/src/main/org/jboss/ha/framework/server cluster/src/main/org/jboss/ha/jmx cluster/src/main/org/jboss/proxy/ejb cluster/src/main/org/jboss/proxy/generic testsuite/src/main/org/jboss/test/cluster/drm testsuite/src/main/org/jboss/test/cluster/test testsuite/src/main/org/jboss/test/testbeancluster/test
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 17 14:13:36 EST 2006
Author: vivekl at redhat.com
Date: 2006-11-17 14:13:31 -0500 (Fri, 17 Nov 2006)
New Revision: 58501
Added:
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java
branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java
Modified:
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java
branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java
Log:
ASPATCH-115: JBAS-3822: Port HAPartition concurrency fixes to 4.0.3.SP1
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -117,7 +117,14 @@
// DistributedReplicantManager.ReplicantListener implementation ---------------------------
- public void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
+ /**
+ * @todo examine thread safety. synchronized keyword was added to method
+ * signature when internal behavior of DistributedReplicantManagerImpl was
+ * changed so that multiple threads could concurrently send replicantsChanged
+ * notifications. Need to examine in detail how this method interacts with
+ * DistributedState to see if we can remove/narrow the synchronization.
+ */
+ public synchronized void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
{
if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME))
{
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -11,8 +11,6 @@
import java.util.Vector;
import java.util.ArrayList;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
-
/**
*
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
@@ -35,9 +33,13 @@
// *******************************
//
/**
- * Return the name of the current name in the current partition. The name is
- * dynamically determined by the partition.
- * @return The partition name
+ * Return the name of this node in the current partition. The name is
+ * dynamically determined by the partition. The name will be the String
+ * returned by <code>getClusterNode().getName()</code>.
+ *
+ * @return The node name
+ *
+ * @see #getClusterNode()
*/
public String getNodeName();
/**
@@ -268,8 +270,33 @@
* @param listener The listener wishing to unsubscribe
*/
public void unregisterMembershipListener(HAMembershipListener listener);
-
/**
+ * Returns whether this partition will synchronously notify any
+ * HAMembershipListeners of membership changes using the calling thread
+ * from the underlying <code>ClusterPartition</code>.
+ *
+ * @return <code>true</code> if registered listeners that don't implement
+ * <code>AsynchHAMembershipExtendedListener</code> or
+ * <code>AsynchHAMembershipListener</code> will be notified
+ * synchronously of membership changes; <code>false</code> if
+ * those listeners will be notified asynchronously. Default
+ * is <code>false</code>.
+ */
+ public boolean getAllowSynchronousMembershipNotifications();
+ /**
+ * Sets whether this partition will synchronously notify any
+ * HAMembershipListeners of membership changes using the calling thread
+ * from the underlying <code>ClusterPartition</code>.
+ *
+ * @param allowSync <code>true</code> if registered listeners that don't
+ * implement <code>AsynchHAMembershipExtendedListener</code> or
+ * <code>AsynchHAMembershipListener</code> should be notified
+ * synchronously of membership changes; <code>false</code> if
+ * those listeners can be notified asynchronously. Default
+ * is <code>false</code>.
+ */
+ public void setAllowSynchronousMembershipNotifications(boolean allowSync);
+ /**
* Each time the partition topology changes, a new view is computed. A view is a list of members,
* the first member being the coordinator of the view. Each view also has a distinct identifier.
* @return The identifier of the current view
@@ -283,7 +310,9 @@
/**
* Return the member nodes that built the current view i.e. the current partition.
- * @return An array of ClusterNode containing the node names
+ * @return An array of ClusterNode listing the current members of the partitionn.
+ * This array will be in the same order in all nodes in the cluster that
+ * have received the current view.
*/
public ClusterNode[] getClusterNodes ();
Copied: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java (from rev 58500, branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/AsynchEventHandler.java)
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -7,6 +7,7 @@
package org.jboss.ha.framework.server;
+import java.util.Set;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,14 +27,13 @@
import org.jboss.logging.Logger;
+import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
- * This class manages replicated objects.
- *
- * @todo there is still too much synrhonization on keyListeners
+ * This class manages replicated objects.
*
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
@@ -43,7 +43,8 @@
public class DistributedReplicantManagerImpl
implements DistributedReplicantManagerImplMBean,
HAPartition.HAMembershipExtendedListener,
- HAPartition.HAPartitionStateTransfer
+ HAPartition.HAPartitionStateTransfer,
+ AsynchEventHandler.AsynchEventProcessor
{
// Constants -----------------------------------------------------
@@ -56,7 +57,9 @@
protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
protected HashMap intraviewIdCache = new HashMap();
- protected HAPartition partition;
+ protected HAPartition partition;
+ /** The handler used to send replicant change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
protected Logger log;
@@ -113,6 +116,10 @@
public void start() throws Exception
{
this.nodeName = this.partition.getNodeName ();
+
+ // Create the asynch listener handler thread
+ asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
+ asynchHandler.start();
partitionNameKnown.release (); // partition name is now known!
@@ -122,8 +129,15 @@
public void stop() throws Exception
{
- // we cleanly shutdown. This should be optimized.
- //
+ // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to
+ // destroy allows restart of HAPartition while local registrations
+ // survive -- stopping partition does not stop all registered services
+ // e.g. ejbs; if we maintain their registrations we can pass them to
+ // the cluster when we restart. However, we are leaving all the remote
+ // replicants we have registered around, so they will still be included
+ // as targets if anyone contacts our EJB while partition is stopped.
+ // Probably OK; if they aren't valid the client will find this out.
+
// NR 200505 : [JBCLUSTER-38] move to destroy
// if (localReplicants != null)
// {
@@ -136,14 +150,24 @@
// }
// }
+ // Stop the asynch handler thread
+ try
+ {
+ asynchHandler.stop();
+ }
+ catch( Exception e)
+ {
+ log.warn("Failed to stop asynchHandler", e);
+ }
+
// NR 200505 : [JBCLUSTER-38] move to destroy
// this.mbeanserver.unregisterMBean (this.jmxName);
}
// NR 200505 : [JBCLUSTER-38] unbind at destroy
public void destroy() throws Exception
- {
- // we cleanly shutdown. This should be optimized.
+ {
+ // now partition can't be resuscitated, so remove local replicants
if (localReplicants != null)
{
synchronized(localReplicants)
@@ -152,11 +176,17 @@
localReplicants.keySet().toArray(keys);
for(int n = 0; n < keys.length; n ++)
{
- this.remove (keys[n]);
+ this.removeLocal(keys[n]); // channel is disconnected, so
+ // don't try to notify cluster
}
}
}
+
this.mbeanserver.unregisterMBean (this.jmxName);
+
+ partition.unregisterRPCHandler(SERVICE_NAME, this);
+ partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
+ partition.unregisterMembershipListener(this);
}
public String listContent () throws Exception
@@ -334,6 +364,20 @@
// we don't need to merge members anymore
}
+ // AsynchEventHandler.AsynchEventProcessor implementation -----------------
+
+ public void processEvent(Object event)
+ {
+ KeyChangeEvent kce = (KeyChangeEvent) event;
+ notifyKeyListeners(kce.key, kce.replicants);
+ }
+
+ static class KeyChangeEvent
+ {
+ String key;
+ List replicants;
+ }
+
// DistributedReplicantManager implementation ----------------------------------------------
public void add(String key, Serializable replicant) throws Exception
@@ -352,27 +396,31 @@
}
public void remove(String key) throws Exception
- {
+ {
partitionNameKnown.acquire (); // we don't propagate until our name is known
- if (localReplicants.containsKey (key))
- // optimisation: we don't make a costly network call if there is
- // nothing to remove
- //
+ // optimisation: we don't make a costly network call
+ // if there is nothing to remove
+ if (localReplicants.containsKey(key))
{
Object[] args = {key, this.nodeName};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
- synchronized(localReplicants)
- {
- localReplicants.remove(key);
- List result = lookupReplicants(key);
- if (result == null)
- result = new ArrayList (); // don't return null but an empty list
- notifyKeyListeners(key, result);
- }
+ removeLocal(key);
}
}
+ protected void removeLocal(String key)
+ {
+ synchronized(localReplicants)
+ {
+ localReplicants.remove(key);
+ List result = lookupReplicants(key);
+ if (result == null)
+ result = new ArrayList (); // don't pass null but an empty list
+ notifyKeyListeners(key, result);
+ }
+ }
+
public Serializable lookupLocalReplicant(String key)
{
return (Serializable)localReplicants.get(key);
@@ -386,10 +434,33 @@
return null;
ArrayList rtn = new ArrayList();
- if (local != null)
- rtn.add(local);
- if (replicant != null)
- rtn.addAll(replicant.values());
+
+ if (replicant == null)
+ {
+ if (local != null)
+ rtn.add(local);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ ClusterNode[] nodes = partition.getClusterNodes();
+ String replNode;
+ Object replVal;
+ for (int i = 0; i < nodes.length; i++)
+ {
+ replNode = nodes[i].getName();
+ if (local != null && nodeName.equals(replNode))
+ {
+ rtn.add(local);
+ continue;
+ }
+
+ replVal = replicant.get(replNode);
+ if (replVal != null)
+ rtn.add(replVal);
+ }
+ }
+
return rtn;
}
@@ -401,10 +472,32 @@
return null;
ArrayList rtn = new ArrayList();
- if (locallyReplicated)
- rtn.add(this.nodeName);
- if (replicant != null)
- rtn.addAll(replicant.keySet ());
+
+ if (replicant == null)
+ {
+ if (locallyReplicated)
+ rtn.add(this.nodeName);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ Set keys = replicant.keySet();
+ ClusterNode[] nodes = partition.getClusterNodes();
+ String keyOwner;
+ for (int i = 0; i < nodes.length; i++)
+ {
+ keyOwner = nodes[i].getName();
+ if (locallyReplicated && nodeName.equals(keyOwner))
+ {
+ rtn.add(this.nodeName);
+ continue;
+ }
+
+ if (keys.contains(keyOwner))
+ rtn.add(keyOwner);
+ }
+ }
+
return rtn;
}
@@ -494,7 +587,7 @@
// DistributedReplicantManager cluster callbacks ----------------------------------------------
/**
- * cluster callback called when a new replicant is added on another node
+ * Cluster callback called when a new replicant is added on another node
* @param key Replicant key
* @param nodeName Node that add the current replicant
* @param replicant Serialized representation of the replicant
@@ -507,7 +600,11 @@
try
{
addReplicant(key, nodeName, replicant);
- notifyKeyListeners(key, lookupReplicants(key));
+ // Notify listeners asynchronously
+ KeyChangeEvent kce = new KeyChangeEvent();
+ kce.key = key;
+ kce.replicants = lookupReplicants(key);
+ asynchHandler.queueEvent(kce);
}
catch (Exception ex)
{
@@ -516,7 +613,7 @@
}
/**
- * cluster callback called when a replicant is removed by another node
+ * Cluster callback called when a replicant is removed by another node
* @param key Name of the replicant key
* @param nodeName Node that wants to remove its replicant for the give key
*/
@@ -524,8 +621,13 @@
{
try
{
- if (removeReplicant (key, nodeName))
- notifyKeyListeners(key, lookupReplicants(key));
+ if (removeReplicant (key, nodeName)) {
+ // Notify listeners asynchronously
+ KeyChangeEvent kce = new KeyChangeEvent();
+ kce.key = key;
+ kce.replicants = lookupReplicants(key);
+ asynchHandler.queueEvent(kce);
+ }
}
catch (Exception ex)
{
@@ -694,33 +796,40 @@
* Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
* @param key The replicant key name
* @param newReplicants The new list of replicants
+ *
*/
protected void notifyKeyListeners(String key, List newReplicants)
{
if( trace )
log.trace("notifyKeyListeners");
- synchronized(keyListeners)
+
+ // we first update the intra-view id for this particular key
+ //
+ int newId = updateReplicantsHashId (key);
+
+ ArrayList listeners = (ArrayList)keyListeners.get(key);
+ if (listeners == null)
{
- // we first update the intra-view id for this particular key
- //
- int newId = updateReplicantsHashId (key);
-
- ArrayList listeners = (ArrayList)keyListeners.get(key);
- if (listeners == null)
- {
- if( trace )
- log.trace("listeners is null");
- return;
- }
-
if( trace )
- log.trace("notifying " + listeners.size() + " listeners for key change: " + key);
- for (int i = 0; i < listeners.size(); i++)
- {
- DistributedReplicantManager.ReplicantListener listener = (DistributedReplicantManager.ReplicantListener)listeners.get(i);
- listener.replicantsChanged(key, newReplicants, newId);
- }
+ log.trace("listeners is null");
+ return;
}
+
+ // ArrayList's iterator is not thread safe
+ DistributedReplicantManager.ReplicantListener[] toNotify = null;
+ synchronized(listeners)
+ {
+ toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
+ toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+ }
+
+ if( trace )
+ log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
+ for (int i = 0; i < toNotify.length; i++)
+ {
+ if (toNotify[i] != null)
+ toNotify[i].replicantsChanged(key, newReplicants, newId);
+ }
}
protected void republishLocalReplicants()
@@ -742,14 +851,16 @@
Map.Entry entry = (Map.Entry) entries.next();
String key = (String) entry.getKey();
Object replicant = entry.getValue();
+ if (replicant != null)
+ {
+ if( trace )
+ log.trace("publishing, key=" + key + ", value=" + replicant);
- if( trace )
- log.trace("publishing, key=" + key + ", value=" + replicant);
+ Object[] args = {key, this.nodeName, replicant};
- Object[] args = {key, this.nodeName, replicant};
-
- partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
- notifyKeyListeners(key, lookupReplicants(key));
+ partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
}
if( trace )
log.trace("End Re-Publish local replicants");
@@ -781,12 +892,19 @@
}
for (int i = 0; i < rsp.size(); i++)
{
- Object[] objs = (Object[])rsp.get(i);
- if (objs == null)
+ Object o = rsp.get(i);
+ if (o == null)
{
log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
continue;
}
+ else if (o instanceof Throwable)
+ {
+ log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+ continue;
+ }
+
+ Object[] objs = (Object[]) o;
String node = (String)objs[0];
Map replicants = (Map)objs[1];
Iterator keys = replicants.keySet().iterator();
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -25,8 +25,6 @@
import javax.naming.StringRefAddr;
import javax.management.MBeanServer;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.View;
@@ -36,15 +34,12 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
import org.jboss.invocation.MarshalledValueInputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.naming.NonSerializableFactory;
@@ -63,7 +58,7 @@
public class HAPartitionImpl
extends org.jgroups.blocks.RpcDispatcher
implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
- HAPartition
+ HAPartition, AsynchEventHandler.AsynchEventProcessor
{
private static class NoHandlerForRPC implements Serializable
{
@@ -78,14 +73,14 @@
protected HashMap rpcHandlers = new HashMap();
protected HashMap stateHandlers = new HashMap();
- /** The HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList listeners = new ArrayList();
+ /** Do we send any membership change notifications synchronously? */
+ protected boolean allowSyncListeners = false;
+ /** The synch HAMembershipListener and HAMembershipExtendedListeners */
+ protected ArrayList synchListeners = new ArrayList();
/** The asynch HAMembershipListener and HAMembershipExtendedListeners */
protected ArrayList asynchListeners = new ArrayList();
- /** The LinkedQueue<ViewChangeEvent> of changes to notify asynch listeners of */
- protected LinkedQueue asynchViewChanges = new LinkedQueue();
- /** The Thread used to send membership change notifications asynchronously */
- protected Thread asynchNotifyThread;
+ /** The handler used to send membership change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
/** The current cluster partition members */
protected Vector members = null;
protected Vector jgmembers = null;
@@ -215,6 +210,8 @@
this.dsManager.init();
log.debug("bind distributed state service");
+ // Create the asynchronous handler for view changes
+ asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
// Bind ourself in the public JNDI space
//
@@ -271,10 +268,8 @@
this.replicantManager.start();
this.dsManager.start();
- // Create the asynch listener handler thread
- AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
- asynchNotifyThread = new Thread(asynchHandler, "AsynchHAMembershipListener Thread");
- asynchNotifyThread.start();
+ // Start the asynch listener handler thread
+ asynchHandler.start();
}
public void closePartition() throws Exception
@@ -284,11 +279,11 @@
try
{
- asynchNotifyThread.interrupt();
+ asynchHandler.stop();
}
catch( Exception e)
{
- log.warn("Failed to interrupte asynchNotifyThread", e);
+ log.warn("Failed to stop asynchHandler", e);
}
// Stop the DRM and DS services
@@ -556,11 +551,14 @@
log.debug("membership changed from " + this.members.size() + " to "
+ event.allMembers.size());
// Put the view change to the asynch queue
- this.asynchViewChanges.put(event);
+ this.asynchHandler.queueEvent(event);
// Broadcast the new view to the synchronous view change listeners
- this.notifyListeners(listeners, event.viewId, event.allMembers,
- event.deadMembers, event.newMembers, event.originatingGroups);
+ if (this.allowSyncListeners)
+ {
+ this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+ event.deadMembers, event.newMembers, event.originatingGroups);
+ }
}
catch (Exception ex)
{
@@ -899,20 +897,48 @@
//
public void registerMembershipListener(HAMembershipListener listener)
{
- synchronized(this.listeners)
- {
- this.listeners.add(listener);
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.add(listener);
+ }
}
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.add(listener);
+ }
+ }
}
public void unregisterMembershipListener(HAMembershipListener listener)
{
- synchronized(this.listeners)
- {
- this.listeners.remove(listener);
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.remove(listener);
+ }
}
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.remove(listener);
+ }
+ }
}
+ public boolean getAllowSynchronousMembershipNotifications()
+ {
+ return allowSyncListeners;
+ }
+
+ public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+ {
+ this.allowSyncListeners = allowSync;
+ }
+
// org.jgroups.RpcDispatcher overrides ---------------------------------------------------
/**
@@ -998,10 +1024,21 @@
log.trace("rpc call threw exception", t);
retval = t;
}
-
+
return retval;
}
+
+ // AsynchEventHandler.AsynchEventProcessor -----------------------
+ public void processEvent(Object event)
+ {
+ ViewChangeEvent vce = (ViewChangeEvent) event;
+ notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+ vce.deadMembers, vce.newMembers, vce.originatingGroups);
+
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1101,31 +1138,35 @@
Vector originatingGroups)
{
log.debug("Begin notifyListeners, viewID: "+viewID);
+
synchronized(theListeners)
{
- for (int i = 0; i < theListeners.size(); i++)
+ // JBAS-3619 -- don't hold synch lock while notifying
+ theListeners = (ArrayList) theListeners.clone();
+ }
+
+ for (int i = 0; i < theListeners.size(); i++)
+ {
+ HAMembershipListener aListener = null;
+ try
{
- HAMembershipListener aListener = null;
- try
+ aListener = (HAMembershipListener) theListeners.get(i);
+ if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
{
- aListener = (HAMembershipListener) theListeners.get(i);
- if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
- {
- HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
- exListener.membershipChangedDuringMerge (deadMembers, newMembers,
- allMembers, originatingGroups);
- }
- else
- {
- aListener.membershipChanged(deadMembers, newMembers, allMembers);
- }
+ HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
+ exListener.membershipChangedDuringMerge (deadMembers, newMembers,
+ allMembers, originatingGroups);
}
- catch (Throwable e)
+ else
{
- // a problem in a listener should not prevent other members to receive the new view
- log.warn("HAMembershipListener callback failure: "+aListener, e);
+ aListener.membershipChanged(deadMembers, newMembers, allMembers);
}
}
+ catch (Throwable e)
+ {
+ // a problem in a listener should not prevent other members to receive the new view
+ log.warn("HAMembershipListener callback failure: "+aListener, e);
+ }
}
log.debug("End notifyListeners, viewID: "+viewID);
}
@@ -1154,7 +1195,7 @@
catch (Exception ignored){}
}
- /** A simply data class containing the view change event needed to
+ /** A simple data class containing the view change event needed to
* message the HAMembershipListeners
*/
private static class ViewChangeEvent
@@ -1166,31 +1207,6 @@
Vector originatingGroups;
}
- /** The Runnable that handles the asynchronous listener notifications
- */
- private class AsynchViewChangeHandler implements Runnable
- {
- public void run()
- {
- log.debug("Begin AsynchViewChangeHandler");
- while( true )
- {
- try
- {
- ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take();
- notifyListeners(asynchListeners, event.viewId, event.allMembers,
- event.deadMembers, event.newMembers, event.originatingGroups);
- }
- catch(InterruptedException e)
- {
- log.debug("AsynchViewChangeHandler interrupted", e);
- break;
- }
- }
- log.debug("End AsynchViewChangeHandler");
- }
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -187,6 +187,8 @@
// register to listen to topology changes, which might cause the election of a new master
drmListener = new DistributedReplicantManager.ReplicantListener()
{
+ Object mutex = new Object();
+
public void replicantsChanged(
String key,
List newReplicants,
@@ -194,8 +196,16 @@
{
if (key.equals(getServiceHAName()))
{
- // change in the topology callback
- HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+ // This synchronized block was added when the internal behavior of
+ // DistributedReplicantManagerImpl was changed so that concurrent
+ // replicantsChanged notifications are possible. Synchronization
+ // ensures that this change won't break non-thread-safe
+ // subclasses of HAServiceMBeanSupport.
+ synchronized(mutex)
+ {
+ // change in the topology callback
+ HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+ }
}
}
};
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -180,7 +180,11 @@
}
}
- public void replicantsChanged (String key, List newReplicants, int newReplicantsViewId)
+ // synchronized keyword added when it became possible for DRM to issue
+ // concurrent replicantsChanged notifications. JBAS-2169.
+ public synchronized void replicantsChanged (String key,
+ List newReplicants,
+ int newReplicantsViewId)
{
try
{
Modified: branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -158,7 +158,11 @@
}
}
- public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId)
+ // synchronized keyword added when it became possible for DRM to issue
+ // concurrent replicantsChanged notifications. JBAS-2169.
+ public synchronized void replicantsChanged(String key,
+ List newReplicants,
+ int newReplicantsViewId)
{
try
{
Copied: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java (from rev 58500, branches/JBoss_4_0_3_SP1_JBAS-3822/testsuite/src/main/org/jboss/test/cluster/drm/MockHAPartition.java)
Modified: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/cluster/test/DRMTestCase.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -1,16 +1,38 @@
/*
-* JBoss, the OpenSource J2EE webOS
-*
-* Distributable under LGPL license.
-* See terms of license at gnu.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.test.cluster.test;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Vector;
import java.util.List;
import java.util.HashSet;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.Notification;
@@ -19,14 +41,23 @@
import org.jboss.test.JBossClusteredTestCase;
import org.jboss.test.cluster.drm.IReplicants;
+import org.jboss.test.cluster.drm.MockHAPartition;
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener;
+import org.jboss.ha.framework.server.DistributedReplicantManagerImpl;
import org.jboss.jmx.adaptor.rmi.RMIAdaptor;
import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt;
import org.jboss.jmx.adaptor.rmi.RMINotificationListener;
+import org.jgroups.stack.IpAddress;
import org.apache.log4j.Logger;
-/** Tests of http session replication
+import EDU.oswego.cs.dl.util.concurrent.Semaphore;
+
+/** Tests of the DistributedReplicantManagerImpl
*
* @author Scott.Stark at jboss.org
+ * @author Brian.Stansberry at jboss.com
* @version $Revision$
*/
public class DRMTestCase extends JBossClusteredTestCase
@@ -45,7 +76,419 @@
log.info("handleNotification, "+notification);
}
}
+
+ /**
+ * Thread that will first register a DRM ReplicantLister that synchronizes
+ * on the test class' lock object, and then calls DRM add or remove,
+ * causing the thread to block if the lock object's monitor is held.
+ */
+ static class BlockingListenerThread extends Thread
+ implements DistributedReplicantManager.ReplicantListener
+ {
+ private DistributedReplicantManagerImpl drm;
+ private String nodeName;
+ private boolean add;
+ private boolean blocking;
+ private Exception ex;
+
+ BlockingListenerThread(DistributedReplicantManagerImpl drm,
+ boolean add,
+ String nodeName)
+ {
+ this.drm = drm;
+ this.add =add;
+ this.nodeName = nodeName;
+ drm.registerListener("TEST", this);
+ }
+ public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId)
+ {
+ blocking = true;
+ synchronized(lock)
+ {
+ blocking = false;
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ if (add)
+ {
+ if (nodeName == null)
+ drm.add("TEST", "local-replicant");
+ else
+ drm._add("TEST", nodeName, "remote-replicant");
+ }
+ else
+ {
+ if (nodeName == null)
+ drm.remove("TEST");
+ else
+ drm._remove("TEST", nodeName);
+ }
+ }
+ catch (Exception e)
+ {
+ ex = e;
+ }
+ }
+
+ public boolean isBlocking()
+ {
+ return blocking;
+ }
+
+ public Exception getException()
+ {
+ return ex;
+ }
+
+ }
+
+ /**
+ * Thread that registers and then unregisters a DRM ReplicantListener.
+ */
+ static class RegistrationThread extends Thread
+ {
+ private DistributedReplicantManager drm;
+ private boolean registered = false;
+ private boolean unregistered = true;
+
+ RegistrationThread(DistributedReplicantManager drm)
+ {
+ this.drm = drm;
+ }
+
+ public void run()
+ {
+ NullListener listener = new NullListener();
+ drm.registerListener("DEADLOCK", listener);
+ registered = true;
+ drm.unregisterListener("DEADLOCK", listener);
+ unregistered = true;
+ }
+
+ public boolean isRegistered()
+ {
+ return registered;
+ }
+
+ public boolean isUnregistered()
+ {
+ return unregistered;
+ }
+
+ }
+
+ /**
+ * A DRM ReplicantListener that does nothing.
+ */
+ static class NullListener
+ implements DistributedReplicantManager.ReplicantListener
+ {
+ public void replicantsChanged(String key, List newReplicants,
+ int newReplicantsViewId)
+ {
+ // no-op
+ }
+ }
+
+ /**
+ * DRM ReplicantListener that mimics the HASingletonDeployer service
+ * by deploying/undeploying a service if it's notified that by that DRM
+ * that it is the master replica for its key.
+ */
+ static class MockHASingletonDeployer
+ implements DistributedReplicantManager.ReplicantListener
+ {
+ DistributedReplicantManager drm;
+ MockDeployer deployer;
+ String key;
+ boolean master = false;
+ NullListener deploymentListener = new NullListener();
+ Exception ex;
+ Logger log;
+ Object mutex = new Object();
+
+ MockHASingletonDeployer(MockDeployer deployer, String key, Logger log)
+ {
+ this.drm = deployer.getDRM();
+ this.deployer = deployer;
+ this.key = key;
+ this.log = log;
+ }
+
+ public void replicantsChanged(String key,
+ List newReplicants,
+ int newReplicantsViewId)
+ {
+ if (this.key.equals(key))
+ {
+ synchronized(mutex)
+ {
+ boolean nowMaster = drm.isMasterReplica(key);
+
+ try
+ {
+ if (!master && nowMaster) {
+ log.debug(Thread.currentThread().getName() +
+ " Deploying " + key);
+ deployer.deploy(key + "A", key, deploymentListener);
+ }
+ else if (master && !nowMaster) {
+ log.debug(Thread.currentThread().getName() +
+ " undeploying " + key);
+ deployer.undeploy(key + "A", deploymentListener);
+ }
+ else
+ {
+ log.debug(Thread.currentThread().getName() +
+ " -- no status change in " + key +
+ " -- master = " + master);
+ }
+ master = nowMaster;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ if (ex == null)
+ ex = e;
+ }
+ }
+ }
+ }
+
+ public Exception getException()
+ {
+ return ex;
+ }
+
+ }
+
+ /**
+ * Thread the repeatedly deploys and undeploys a MockHASingletonDeployer.
+ */
+ static class DeployerThread extends Thread
+ {
+ Semaphore semaphore;
+ MockDeployer deployer;
+ DistributedReplicantManager.ReplicantListener listener;
+ String key;
+ Exception ex;
+ int count = -1;
+ Logger log;
+
+ DeployerThread(MockDeployer deployer,
+ String key,
+ DistributedReplicantManager.ReplicantListener listener,
+ Semaphore semaphore,
+ Logger log)
+ {
+ super("Deployer " + key);
+ this.deployer = deployer;
+ this.listener = listener;
+ this.key = key;
+ this.semaphore = semaphore;
+ this.log = log;
+ }
+
+ public void run()
+ {
+ boolean acquired = false;
+ try
+ {
+ acquired = semaphore.attempt(60000);
+ if (!acquired)
+ throw new Exception("Cannot acquire semaphore");
+ SecureRandom random = new SecureRandom();
+ for (count = 0; count < LOOP_COUNT; count++)
+ {
+ deployer.deploy(key, "JGroups", listener);
+
+ sleepThread(random.nextInt(50));
+ deployer.undeploy(key, listener);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ ex = e;
+ }
+ finally
+ {
+ if (acquired)
+ semaphore.release();
+ }
+ }
+
+ public Exception getException()
+ {
+ return ex;
+ }
+
+ public int getCount()
+ {
+ return count;
+ }
+ }
+
+ /**
+ * Thread that mimics the JGroups up-handler thread that calls into the DRM.
+ * Repeatedly and randomly calls adds or removes a replicant for a set
+ * of keys.
+ */
+ static class JGroupsThread extends Thread
+ {
+ Semaphore semaphore;
+ DistributedReplicantManagerImpl drm;
+ String[] keys;
+ String nodeName;
+ Exception ex;
+ int count = -1;
+ int weightFactor;
+
+ JGroupsThread(DistributedReplicantManagerImpl drm,
+ String[] keys,
+ String nodeName,
+ Semaphore semaphore)
+ {
+ super("JGroups");
+ this.drm = drm;
+ this.keys = keys;
+ this.semaphore = semaphore;
+ this.nodeName = nodeName;
+ this.weightFactor = (int) 2.5 * keys.length;
+ }
+
+ public void run()
+ {
+ boolean acquired = false;
+ try
+ {
+ acquired = semaphore.attempt(60000);
+ if (!acquired)
+ throw new Exception("Cannot acquire semaphore");
+ boolean[] added = new boolean[keys.length];
+ SecureRandom random = new SecureRandom();
+
+ for (count = 0; count < weightFactor * LOOP_COUNT; count++)
+ {
+ int pos = random.nextInt(keys.length);
+ if (added[pos])
+ {
+ drm._remove(keys[pos], nodeName);
+ added[pos] = false;
+ }
+ else
+ {
+ drm._add(keys[pos], nodeName, "");
+ added[pos] = true;
+ }
+ sleepThread(random.nextInt(30));
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ ex = e;
+ }
+ finally
+ {
+ if (acquired)
+ semaphore.release();
+ }
+ }
+
+ public Exception getException()
+ {
+ return ex;
+ }
+
+ public int getCount()
+ {
+ return (count / weightFactor);
+ }
+
+ }
+
+ /**
+ * Mocks the deployer of a service that registers/unregisters DRM listeners
+ * and replicants. Only allows a single thread of execution, a la the
+ * org.jboss.system.ServiceController.
+ */
+ static class MockDeployer
+ {
+ DistributedReplicantManager drm;
+
+ MockDeployer(DistributedReplicantManager drm)
+ {
+ this.drm = drm;
+ }
+
+ void deploy(String key, String replicant,
+ DistributedReplicantManager.ReplicantListener listener)
+ throws Exception
+ {
+ synchronized(this)
+ {
+ drm.registerListener(key, listener);
+ drm.add(key, replicant);
+ sleepThread(10);
+ }
+ }
+
+ void undeploy(String key,
+ DistributedReplicantManager.ReplicantListener listener)
+ throws Exception
+ {
+ synchronized(this)
+ {
+ drm.remove(key);
+ drm.unregisterListener(key, listener);
+ sleepThread(10);
+ }
+ }
+
+ DistributedReplicantManager getDRM()
+ {
+ return drm;
+ }
+ }
+
+ /** ReplicantListener that caches the list of replicants */
+ static class CachingListener implements ReplicantListener
+ {
+ List replicants = null;
+ boolean clean = true;
+
+ public void replicantsChanged(String key, List newReplicants,
+ int newReplicantsViewId)
+ {
+ this.replicants = newReplicants;
+ if (clean && newReplicants != null)
+ {
+ int last = Integer.MIN_VALUE;
+ for (Iterator iter = newReplicants.iterator(); iter.hasNext(); )
+ {
+ int cur = ((Integer) iter.next()).intValue();
+ if (last >= cur)
+ {
+ clean = false;
+ break;
+ }
+
+ last = cur;
+ }
+ }
+ }
+
+ }
+
+ private static Object lock = new Object();
+ private static int LOOP_COUNT = 30;
+
public static Test suite() throws Exception
{
Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar");
@@ -137,5 +580,660 @@
server0.removeNotificationListener(drmService, listener);
server1.removeNotificationListener(drmService, listener);
}
+
+ /**
+ * Tests the functionality of isMasterReplica(), also testing merge
+ * handling.
+ *
+ * TODO move this test out of the testsuite and into the cluster module
+ * itself, since it doesn't rely on the container.
+ *
+ * @throws Exception
+ */
+ public void testIsMasterReplica() throws Exception
+ {
+ log.debug("+++ testIsMasterReplica()");
+
+ MBeanServer mbeanServer =
+ MBeanServerFactory.createMBeanServer("mockPartition");
+ try {
+ ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+ MockHAPartition partition = new MockHAPartition(localAddress);
+
+ DistributedReplicantManagerImpl drm =
+ new DistributedReplicantManagerImpl(partition, mbeanServer);
+ drm.init();
+
+ // Create a fake view for the MockHAPartition
+
+ Vector remoteAddresses = new Vector();
+ for (int i = 1; i < 5; i++)
+ remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+
+ Vector allNodes = new Vector(remoteAddresses);
+ allNodes.add(localAddress);
+ partition.setCurrentViewClusterNodes(allNodes);
+
+ // Pass fake state to DRMImpl
+
+ HashMap replicants = new HashMap();
+ ArrayList remoteResponses = new ArrayList();
+ for (int i = 0; i < remoteAddresses.size(); i++)
+ {
+ ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i);
+ Integer replicant = new Integer(i + 1);
+ replicants.put(node.getName(), replicant);
+ HashMap localReplicant = new HashMap();
+ localReplicant.put("Mock", replicant);
+ remoteResponses.add(new Object[] {node.getName(), localReplicant});
+ }
+ HashMap services = new HashMap();
+ services.put("Mock", replicants);
+
+ int hash = 0;
+ for (int i = 1; i < 5; i++)
+ hash += (new Integer(i)).hashCode();
+
+ HashMap intraviewIds = new HashMap();
+ intraviewIds.put("Mock", new Integer(hash));
+
+ partition.setRemoteReplicants(remoteResponses);
+
+ drm.setCurrentState(new Object[] {services, intraviewIds });
+
+ drm.start();
+
+ // add a local replicant
+
+ drm.add("Mock", new Integer(5));
+
+ // test that this node is not the master replica
+
+ assertFalse("Local node is not master after startup",
+ drm.isMasterReplica("Mock"));
+
+ // simulate a split where this node is the coord
+
+ Vector localOnly = new Vector();
+ localOnly.add(localAddress);
+
+ partition.setCurrentViewClusterNodes(localOnly);
+ partition.setRemoteReplicants(new ArrayList());
+
+ drm.membershipChanged(remoteAddresses, new Vector(), localOnly);
+
+ // test that this node is the master replica
+
+ assertTrue("Local node is master after split", drm.isMasterReplica("Mock"));
+
+ // Remove our local replicant
+
+ drm.remove("Mock");
+
+ // test that this node is not the master replica
+
+ assertFalse("Local node is not master after dropping replicant",
+ drm.isMasterReplica("Mock"));
+
+ // Restore the local replicant
+
+ drm.add("Mock", new Integer(5));
+
+ // simulate a merge
+
+ Vector mergeGroups = new Vector();
+ mergeGroups.add(remoteAddresses);
+ mergeGroups.add(localOnly);
+
+ partition.setCurrentViewClusterNodes(allNodes);
+ partition.setRemoteReplicants(remoteResponses);
+
+ drm.membershipChangedDuringMerge(new Vector(), remoteAddresses,
+ allNodes, mergeGroups);
+
+ // Merge processing is done asynchronously, so pause a bit
+ sleepThread(100);
+
+ // test that this node is not the master replica
+
+ assertFalse("Local node is not master after merge",
+ drm.isMasterReplica("Mock"));
+ }
+ finally {
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+ }
+ }
+
+
+ /**
+ * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+ * prevent other threads registering/unregistering listeners. JBAS-2539
+ *
+ * TODO move this test out of the testsuite and into the cluster module
+ * itself, since it doesn't rely on the container.
+ *
+ * @throws Exception
+ */
+ public void testKeyListenerDeadlock() throws Exception
+ {
+ log.debug("+++ testKeyListenerDeadlock()");
+
+ MBeanServer mbeanServer =
+ MBeanServerFactory.createMBeanServer("mockPartition");
+ try {
+ ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+ MockHAPartition partition = new MockHAPartition(localAddress);
+
+ DistributedReplicantManagerImpl drm =
+ new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+ drm.init();
+
+ // Create a fake view for the MockHAPartition
+
+ Vector remoteAddresses = new Vector();
+ for (int i = 1; i < 5; i++)
+ remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+
+ Vector allNodes = new Vector(remoteAddresses);
+ allNodes.add(localAddress);
+ partition.setCurrentViewClusterNodes(allNodes);
+
+ drm.start();
+
+ BlockingListenerThread blt =
+ new BlockingListenerThread(drm, true, null);
+
+ // Hold the lock monitor so the test thread can't acquire it
+ // This keeps the blocking thread alive.
+ synchronized(lock) {
+ // Spawn a thread that will change a key and then block on the
+ // notification back to itself
+ blt.start();
+
+ sleepThread(50);
+
+ assertTrue("Test thread is alive", blt.isAlive());
+ assertTrue("Test thread is blocking", blt.isBlocking());
+
+ RegistrationThread rt = new RegistrationThread(drm);
+ rt.start();
+
+ sleepThread(50);
+
+ assertTrue("No deadlock on listener registration", rt.isRegistered());
+
+ assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
+
+ assertNull("No exception in deadlock tester", blt.getException());
+
+ assertTrue("Test thread is still blocking", blt.isBlocking());
+ assertTrue("Test thread is still alive", blt.isAlive());
+ }
+
+ drm.unregisterListener("TEST", blt);
+
+ sleepThread(50);
+
+ // Test going through remove
+ blt = new BlockingListenerThread(drm, false, null);
+
+ // Hold the lock monitor so the test thread can't acquire it
+ // This keeps the blocking thread alive.
+ synchronized(lock) {
+ // Spawn a thread that will change a key and then block on the
+ // notification back to itself
+ blt.start();
+
+ sleepThread(50);
+
+ assertTrue("Test thread is alive", blt.isAlive());
+ assertTrue("Test thread is blocking", blt.isBlocking());
+
+ RegistrationThread rt = new RegistrationThread(drm);
+ rt.start();
+
+ sleepThread(50);
+
+ assertTrue("No deadlock on listener registration", rt.isRegistered());
+
+ assertTrue("No deadlock on listener unregistration", rt.isUnregistered());
+
+ assertNull("No exception in deadlock tester", blt.getException());
+
+ assertTrue("Test thread is still blocking", blt.isBlocking());
+ assertTrue("Test thread is still alive", blt.isAlive());
+ }
+ }
+ finally {
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+ }
+ }
+
+
+ /**
+ * Tests that remotely-originated calls don't block.
+ *
+ * TODO move this test out of the testsuite and into the cluster module
+ * itself, since it doesn't rely on the container.
+ *
+ * @throws Exception
+ */
+ public void testRemoteCallBlocking() throws Exception
+ {
+ log.debug("+++ testRemoteCallBlocking()");
+
+ MBeanServer mbeanServer =
+ MBeanServerFactory.createMBeanServer("mockPartition");
+ try {
+ ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+ MockHAPartition partition = new MockHAPartition(localAddress);
+
+ DistributedReplicantManagerImpl drm =
+ new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+ drm.init();
+
+ // Create a fake view for the MockHAPartition
+
+ Vector remoteAddresses = new Vector();
+ for (int i = 1; i < 5; i++)
+ remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)));
+
+ Vector allNodes = new Vector(remoteAddresses);
+ allNodes.add(localAddress);
+ partition.setCurrentViewClusterNodes(allNodes);
+
+ drm.start();
+
+ String sender = ((ClusterNode)remoteAddresses.get(0)).getName();
+ BlockingListenerThread blt =
+ new BlockingListenerThread(drm, true, sender);
+
+ // Hold the lock monitor so the test thread can't acquire it
+ // This keeps the blocking thread alive.
+ synchronized(lock) {
+ // Spawn a thread that will change a key and then block on the
+ // notification back to itself
+ blt.start();
+
+ sleepThread(50);
+
+ assertFalse("JGroups thread is not alive", blt.isAlive());
+ assertTrue("Async handler thread is blocking", blt.isBlocking());
+
+ assertNull("No exception in JGroups thread", blt.getException());
+ }
+
+ drm.unregisterListener("TEST", blt);
+
+ sleepThread(50);
+
+ // Test going through remove
+ blt = new BlockingListenerThread(drm, false, sender);
+
+ // Hold the lock monitor so the test thread can't acquire it
+ // This keeps the blocking thread alive.
+ synchronized(lock) {
+ // Spawn a thread that will change a key and then block on the
+ // notification back to itself
+ blt.start();
+
+ sleepThread(50);
+
+ assertFalse("JGroups thread is not alive", blt.isAlive());
+ assertTrue("Async handler thread is blocking", blt.isBlocking());
+
+ assertNull("No exception in JGroups thread", blt.getException());
+ }
+ }
+ finally {
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+ }
+ }
+
+ /**
+ * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+ * prevent other threads that use different keys adding/removing
+ * replicants. JBAS-2169
+ *
+ * TODO move this test out of the testsuite and into the cluster module
+ * itself, since it doesn't rely on the container.
+ *
+ * @throws Exception
+ */
+ public void testNonConflictingAddRemoveDeadlock() throws Exception
+ {
+
+ log.debug("+++ testNonConflictingAddRemoveDeadlock()");
+
+ addRemoveDeadlockTest(false);
+ }
+
+ /**
+ * Tests that one thread blocking in DRM.notifyKeyListeners() does not
+ * prevent other threads that use the same keys adding/removing
+ * replicants. JBAS-1151
+ *
+ * NOTE: This test basically demonstrates a small race condition that can
+ * happen with the way HASingletonSupport's startService() method is
+ * implemented (actually HAServiceMBeanSupport, but relevant in the case
+ * of subclass HASingletonSupport, and in particular in its use in the
+ * HASingletonDeployer service). However, since the test doesn't actually
+ * use the relevant code, but rather uses mock objects that work the same
+ * way, this test is disabled -- its purpose has been achieved. JIRA issue
+ * JBAS-1151 tracks the real problem; when it's resolved we'll create a test
+ * case against the real code that proves that fact.
+ *
+ * TODO move this test out of the testsuite and into the cluster module
+ * itself, since it doesn't rely on the container.
+ *
+ * @throws Exception
+ */
+ public void badtestConflictingAddRemoveDeadlock() throws Exception
+ {
+ log.debug("+++ testConflictingAddRemoveDeadlock()");
+
+ addRemoveDeadlockTest(true);
+ }
+
+ private void addRemoveDeadlockTest(boolean conflicting) throws Exception
+ {
+ String[] keys = { "A", "B", "C", "D", "E" };
+ int count = keys.length;
+
+ MBeanServer mbeanServer =
+ MBeanServerFactory.createMBeanServer("mockPartition");
+ try {
+ ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345));
+ MockHAPartition partition = new MockHAPartition(localAddress);
+
+ DistributedReplicantManagerImpl drm =
+ new DistributedReplicantManagerImpl(partition, mbeanServer);
+
+ drm.init();
+
+ // Create a fake view for the MockHAPartition
+
+ Vector remoteAddresses = new Vector();
+ ClusterNode remote = new ClusterNode(new IpAddress("127.0.0.1", 12341));
+ remoteAddresses.add(remote);
+
+ Vector allNodes = new Vector(remoteAddresses);
+ allNodes.add(localAddress);
+ partition.setCurrentViewClusterNodes(allNodes);
+
+ drm.start();
+
+ MockDeployer deployer = new MockDeployer(drm);
+
+ if (!conflicting)
+ {
+ // Register a MockHASingletonDeployer, but since we're in
+ // non-conflicting mode, the DeployerThreads won't deal with it
+ MockHASingletonDeployer listener =
+ new MockHASingletonDeployer(deployer, "HASingleton", log);
+
+ drm.registerListener("HASingleton", listener);
+ drm.add("HASingleton", "HASingleton");
+ }
+
+ // Create a semaphore to gate the threads and acquire all its permits
+ Semaphore semaphore = new Semaphore(count + 1);
+ for (int i = 0; i <= count; i++)
+ semaphore.acquire();
+
+ DeployerThread[] deployers = new DeployerThread[keys.length];
+ for (int i = 0; i < count; i++)
+ {
+ DistributedReplicantManager.ReplicantListener listener = null;
+ if (conflicting)
+ {
+ listener = new MockHASingletonDeployer(deployer, keys[i], log);
+ }
+ else
+ {
+ listener = new NullListener();
+ }
+ deployers[i] = new DeployerThread(deployer, keys[i], listener, semaphore, log);
+ deployers[i].start();
+ }
+
+ String[] jgKeys = keys;
+ if (!conflicting)
+ {
+ // The JGroups thread also deals with the MockHASingletonDeployer
+ // key that the DeployerThreads don't
+ jgKeys = new String[keys.length + 1];
+ System.arraycopy(keys, 0, jgKeys, 0, keys.length);
+ jgKeys[keys.length] = "HASingleton";
+ }
+ JGroupsThread jgThread = new JGroupsThread(drm, jgKeys, remote.getName(), semaphore);
+ jgThread.start();
+
+ // Launch the threads
+ semaphore.release(count + 1);
+
+ boolean reacquired = false;
+ try
+ {
+ // Give the threads 5 secs to acquire the semaphore
+ long maxElapsed = System.currentTimeMillis() + 5000;
+ for (int i = 0; i < keys.length; i++)
+ {
+ if (deployers[i].getCount() < 0)
+ {
+ assertTrue("Thread " + keys[i] + " started in time",
+ maxElapsed - System.currentTimeMillis() > 0);
+ sleepThread(10);
+ i--; // try again
+ }
+ }
+
+ while (jgThread.getCount() < 0)
+ {
+ assertTrue("jgThread started in time",
+ maxElapsed - System.currentTimeMillis() > 0);
+ sleepThread(10);
+ }
+
+ // Reaquire all the permits, thus showing the threads didn't deadlock
+
+ // Give them 500 ms per loop
+ maxElapsed = System.currentTimeMillis() + (500 * LOOP_COUNT);
+ for (int i = 0; i <= count; i++)
+ {
+ long waitTime = maxElapsed - System.currentTimeMillis();
+ assertTrue("Acquired thread " + i, semaphore.attempt(waitTime));
+ }
+
+ reacquired = true;
+
+ // Ensure there were no exceptions
+ for (int i = 0; i < keys.length; i++)
+ {
+ assertEquals("Thread " + keys[i] + " finished", LOOP_COUNT, deployers[i].getCount());
+ assertNull("Thread " + keys[i] + " saw no exceptions", deployers[i].getException());
+ }
+ assertEquals("JGroups Thread finished", LOOP_COUNT, jgThread.getCount());
+ assertNull("JGroups Thread saw no exceptions", jgThread.getException());
+ }
+ finally
+ {
+
+ if (!reacquired)
+ {
+ for (int i = 0; i < keys.length; i++)
+ {
+ if (deployers[i].getException() != null)
+ {
+ System.out.println("Exception in deployer " + i);
+ deployers[i].getException().printStackTrace(System.out);
+ }
+ else
+ {
+ System.out.println("Thread " + i + " completed " + deployers[i].getCount());
+ }
+ }
+ if (jgThread.getException() != null)
+ {
+ System.out.println("Exception in jgThread");
+ jgThread.getException().printStackTrace(System.out);
+ }
+ else
+ {
+ System.out.println("jgThread completed " + jgThread.getCount());
+ }
+ }
+
+ // Be sure the threads are dead
+ if (jgThread.isAlive())
+ {
+ jgThread.interrupt();
+ sleepThread(5);
+ printStackTrace(jgThread.getName(), jgThread.getException());
+ }
+ for (int i = 0; i < keys.length; i++)
+ {
+ if (deployers[i].isAlive())
+ {
+ deployers[i].interrupt();
+ sleepThread(5);
+ printStackTrace(deployers[i].getName(), deployers[i].getException());
+ }
+ }
+
+ }
+ }
+ finally {
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+ }
+ }
+
+ public void testReplicantOrder() throws Exception
+ {
+ MBeanServer mbeanServer =
+ MBeanServerFactory.createMBeanServer("mockPartitionA");
+ try {
+
+ // Create a fake view for the MockHAPartition
+ ClusterNode[] nodes = new ClusterNode[5];
+ String[] names = new String[nodes.length];
+ Integer[] replicants = new Integer[nodes.length];
+ Vector allNodes = new Vector();
+ for (int i = 0; i < nodes.length; i++)
+ {
+ nodes[i] = new ClusterNode(new IpAddress("127.0.0.1", 12340 + i));
+ allNodes.add(nodes[i]);
+ names[i] = nodes[i].getName();
+ replicants[i] = new Integer(i);
+ }
+
+ MockHAPartition partition = new MockHAPartition(nodes[2]);
+ partition.setCurrentViewClusterNodes(allNodes);
+
+ DistributedReplicantManagerImpl drm =
+ new DistributedReplicantManagerImpl(partition, mbeanServer);
+ drm.init();
+ drm.start();
+
+ CachingListener listener = new CachingListener();
+ drm.registerListener("TEST", listener);
+
+ SecureRandom random = new SecureRandom();
+ boolean[] added = new boolean[nodes.length];
+ List lookup = null;
+ for (int i = 0; i < 10; i++)
+ {
+ int node = random.nextInt(nodes.length);
+ if (added[node])
+ {
+ if (node == 2)
+ drm.remove("TEST");
+ else
+ drm._remove("TEST", nodes[node].getName());
+ added[node] = false;
+ }
+ else
+ {
+ if (node == 2)
+ drm.add("TEST", replicants[node]);
+ else
+ drm._add("TEST", nodes[node].getName(), replicants[node]);
+ added[node] = true;
+ }
+
+ // Confirm the proper order of the replicant node names
+ lookup = maskListClass(drm.lookupReplicantsNodeNames("TEST"));
+ confirmReplicantList(lookup, names, added);
+
+ // Confirm the proper order of the replicants via lookupReplicants
+ lookup = maskListClass(drm.lookupReplicants("TEST"));
+ confirmReplicantList(lookup, replicants, added);
+
+ // Confirm the listener got the same list
+// assertEquals("Listener received a correct list", lookup,
+// maskListClass(listener.replicants));
+ }
+
+ // Let the asynchronous notification thread catch up
+ sleep(25);
+
+ // Confirm all lists presented to the listener were properly ordered
+ assertTrue("Listener saw no misordered lists", listener.clean);
+
+ }
+ finally {
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+ }
+ }
+
+ private void confirmReplicantList(List current, Object[] all, boolean[] added)
+ {
+ Iterator iter = current.iterator();
+ for (int i = 0; i < added.length; i++)
+ {
+ if (added[i])
+ {
+ assertTrue("List has more replicants", iter.hasNext());
+ assertEquals("Replicant for node " + i + " is next",
+ all[i], iter.next());
+ }
+ }
+ assertFalse("List has no extra replicants", iter.hasNext());
+ }
+
+ /** Converts the given list to an ArrayList, if it isn't already */
+ private List maskListClass(List toMask)
+ {
+ if (toMask instanceof ArrayList)
+ return toMask;
+ else if (toMask == null)
+ return new ArrayList();
+ else
+ return new ArrayList(toMask);
+ }
+
+ private static void sleepThread(long millis)
+ {
+ try
+ {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void printStackTrace(String threadName, Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ System.out.println("Stack trace for " + threadName);
+ e.printStackTrace(System.out);
+ System.out.println();
+ }
+ }
+
}
Modified: branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java
===================================================================
--- branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java 2006-11-17 18:12:58 UTC (rev 58500)
+++ branches/JBoss_4_0_3_SP1_CP/testsuite/src/main/org/jboss/test/testbeancluster/test/BeanUnitTestCase.java 2006-11-17 19:13:31 UTC (rev 58501)
@@ -350,7 +350,10 @@
RMIAdaptor[] adaptors = getAdaptors();
Long cacheCount = (Long) adaptors[0].getAttribute(oName, "CacheSize");
assertEquals("CacheSize is zero", 0, cacheCount.longValue());
- cacheCount = (Long) adaptors[0].getAttribute(oName, "PassivatedCount");
- assertEquals("PassivatedCount is zero", 0, cacheCount.longValue());
+ // Checking the passivated count is invalid, as it doesn't get reduced
+ // when remove() is called on a bean -- only when the passivation cleanup
+ // thread runs
+ //cacheCount = (Long) adaptors[0].getAttribute(oName, "PassivatedCount");
+ //assertEquals("PassivatedCount is zero", 0, cacheCount.longValue());
}
}
More information about the jboss-cvs-commits
mailing list