[jboss-cvs] JBossAS SVN: r58104 - in branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss: cache/invalidation/bridges ha/framework/interfaces ha/framework/server ha/jmx proxy/ejb proxy/generic
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 3 22:55:56 EST 2006
Author: bstansberry at jboss.com
Date: 2006-11-03 22:55:53 -0500 (Fri, 03 Nov 2006)
New Revision: 58104
Modified:
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
Log:
[JBAS-3822] Port HAPartition concurrency fixes to 4.0.3.SP1
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -117,7 +117,14 @@
// DistributedReplicantManager.ReplicantListener implementation ---------------------------
- public void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
+ /**
+ * @todo examine thread safety. synchronized keyword was added to method
+ * signature when internal behavior of DistributedReplicantManagerImpl was
+ * changed so that multiple threads could concurrently send replicantsChanged
+ * notifications. Need to examine in detail how this method interacts with
+ * DistributedState to see if we can remove/narrow the synchronization.
+ */
+ public synchronized void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId)
{
if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME))
{
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/interfaces/HAPartition.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -11,8 +11,6 @@
import java.util.Vector;
import java.util.ArrayList;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
-
/**
*
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
@@ -35,9 +33,13 @@
// *******************************
//
/**
- * Return the name of the current name in the current partition. The name is
- * dynamically determined by the partition.
- * @return The partition name
+ * Return the name of this node in the current partition. The name is
+ * dynamically determined by the partition. The name will be the String
+ * returned by <code>getClusterNode().getName()</code>.
+ *
+ * @return The node name
+ *
+ * @see #getClusterNode()
*/
public String getNodeName();
/**
@@ -268,8 +270,33 @@
* @param listener The listener wishing to unsubscribe
*/
public void unregisterMembershipListener(HAMembershipListener listener);
-
/**
+ * Returns whether this partition will synchronously notify any
+ * HAMembershipListeners of membership changes using the calling thread
+ * from the underlying <code>ClusterPartition</code>.
+ *
+ * @return <code>true</code> if registered listeners that don't implement
+ * <code>AsynchHAMembershipExtendedListener</code> or
+ * <code>AsynchHAMembershipListener</code> will be notified
+ * synchronously of membership changes; <code>false</code> if
+ * those listeners will be notified asynchronously. Default
+ * is <code>false</code>.
+ */
+ public boolean getAllowSynchronousMembershipNotifications();
+ /**
+ * Sets whether this partition will synchronously notify any
+ * HAMembershipListeners of membership changes using the calling thread
+ * from the underlying <code>ClusterPartition</code>.
+ *
+ * @param allowSync <code>true</code> if registered listeners that don't
+ * implement <code>AsynchHAMembershipExtendedListener</code> or
+ * <code>AsynchHAMembershipListener</code> should be notified
+ * synchronously of membership changes; <code>false</code> if
+ * those listeners can be notified asynchronously. Default
+ * is <code>false</code>.
+ */
+ public void setAllowSynchronousMembershipNotifications(boolean allowSync);
+ /**
* Each time the partition topology changes, a new view is computed. A view is a list of members,
* the first member being the coordinator of the view. Each view also has a distinct identifier.
* @return The identifier of the current view
@@ -283,7 +310,9 @@
/**
* Return the member nodes that built the current view i.e. the current partition.
- * @return An array of ClusterNode containing the node names
+ * @return An array of ClusterNode listing the current members of the partitionn.
+ * This array will be in the same order in all nodes in the cluster that
+ * have received the current view.
*/
public ClusterNode[] getClusterNodes ();
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -7,6 +7,7 @@
package org.jboss.ha.framework.server;
+import java.util.Set;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,14 +27,13 @@
import org.jboss.logging.Logger;
+import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
/**
- * This class manages replicated objects.
- *
- * @todo there is still too much synrhonization on keyListeners
+ * This class manages replicated objects.
*
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
@@ -43,7 +43,8 @@
public class DistributedReplicantManagerImpl
implements DistributedReplicantManagerImplMBean,
HAPartition.HAMembershipExtendedListener,
- HAPartition.HAPartitionStateTransfer
+ HAPartition.HAPartitionStateTransfer,
+ AsynchEventHandler.AsynchEventProcessor
{
// Constants -----------------------------------------------------
@@ -56,7 +57,9 @@
protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
protected HashMap intraviewIdCache = new HashMap();
- protected HAPartition partition;
+ protected HAPartition partition;
+ /** The handler used to send replicant change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
protected Logger log;
@@ -113,6 +116,10 @@
public void start() throws Exception
{
this.nodeName = this.partition.getNodeName ();
+
+ // Create the asynch listener handler thread
+ asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
+ asynchHandler.start();
partitionNameKnown.release (); // partition name is now known!
@@ -122,8 +129,15 @@
public void stop() throws Exception
{
- // we cleanly shutdown. This should be optimized.
- //
+ // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to
+ // destroy allows restart of HAPartition while local registrations
+ // survive -- stopping partition does not stop all registered services
+ // e.g. ejbs; if we maintain their registrations we can pass them to
+ // the cluster when we restart. However, we are leaving all the remote
+ // replicants we have registered around, so they will still be included
+ // as targets if anyone contacts our EJB while partition is stopped.
+ // Probably OK; if they aren't valid the client will find this out.
+
// NR 200505 : [JBCLUSTER-38] move to destroy
// if (localReplicants != null)
// {
@@ -136,14 +150,24 @@
// }
// }
+ // Stop the asynch handler thread
+ try
+ {
+ asynchHandler.stop();
+ }
+ catch( Exception e)
+ {
+ log.warn("Failed to stop asynchHandler", e);
+ }
+
// NR 200505 : [JBCLUSTER-38] move to destroy
// this.mbeanserver.unregisterMBean (this.jmxName);
}
// NR 200505 : [JBCLUSTER-38] unbind at destroy
public void destroy() throws Exception
- {
- // we cleanly shutdown. This should be optimized.
+ {
+ // now partition can't be resuscitated, so remove local replicants
if (localReplicants != null)
{
synchronized(localReplicants)
@@ -152,11 +176,17 @@
localReplicants.keySet().toArray(keys);
for(int n = 0; n < keys.length; n ++)
{
- this.remove (keys[n]);
+ this.removeLocal(keys[n]); // channel is disconnected, so
+ // don't try to notify cluster
}
}
}
+
this.mbeanserver.unregisterMBean (this.jmxName);
+
+ partition.unregisterRPCHandler(SERVICE_NAME, this);
+ partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
+ partition.unregisterMembershipListener(this);
}
public String listContent () throws Exception
@@ -334,6 +364,20 @@
// we don't need to merge members anymore
}
+ // AsynchEventHandler.AsynchEventProcessor implementation -----------------
+
+ public void processEvent(Object event)
+ {
+ KeyChangeEvent kce = (KeyChangeEvent) event;
+ notifyKeyListeners(kce.key, kce.replicants);
+ }
+
+ static class KeyChangeEvent
+ {
+ String key;
+ List replicants;
+ }
+
// DistributedReplicantManager implementation ----------------------------------------------
public void add(String key, Serializable replicant) throws Exception
@@ -352,27 +396,31 @@
}
public void remove(String key) throws Exception
- {
+ {
partitionNameKnown.acquire (); // we don't propagate until our name is known
- if (localReplicants.containsKey (key))
- // optimisation: we don't make a costly network call if there is
- // nothing to remove
- //
+ // optimisation: we don't make a costly network call
+ // if there is nothing to remove
+ if (localReplicants.containsKey(key))
{
Object[] args = {key, this.nodeName};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
- synchronized(localReplicants)
- {
- localReplicants.remove(key);
- List result = lookupReplicants(key);
- if (result == null)
- result = new ArrayList (); // don't return null but an empty list
- notifyKeyListeners(key, result);
- }
+ removeLocal(key);
}
}
+ protected void removeLocal(String key)
+ {
+ synchronized(localReplicants)
+ {
+ localReplicants.remove(key);
+ List result = lookupReplicants(key);
+ if (result == null)
+ result = new ArrayList (); // don't pass null but an empty list
+ notifyKeyListeners(key, result);
+ }
+ }
+
public Serializable lookupLocalReplicant(String key)
{
return (Serializable)localReplicants.get(key);
@@ -386,10 +434,33 @@
return null;
ArrayList rtn = new ArrayList();
- if (local != null)
- rtn.add(local);
- if (replicant != null)
- rtn.addAll(replicant.values());
+
+ if (replicant == null)
+ {
+ if (local != null)
+ rtn.add(local);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ ClusterNode[] nodes = partition.getClusterNodes();
+ String replNode;
+ Object replVal;
+ for (int i = 0; i < nodes.length; i++)
+ {
+ replNode = nodes[i].getName();
+ if (local != null && nodeName.equals(replNode))
+ {
+ rtn.add(local);
+ continue;
+ }
+
+ replVal = replicant.get(replNode);
+ if (replVal != null)
+ rtn.add(replVal);
+ }
+ }
+
return rtn;
}
@@ -401,10 +472,32 @@
return null;
ArrayList rtn = new ArrayList();
- if (locallyReplicated)
- rtn.add(this.nodeName);
- if (replicant != null)
- rtn.addAll(replicant.keySet ());
+
+ if (replicant == null)
+ {
+ if (locallyReplicated)
+ rtn.add(this.nodeName);
+ }
+ else
+ {
+ // JBAS-2677. Put the replicants in view order.
+ Set keys = replicant.keySet();
+ ClusterNode[] nodes = partition.getClusterNodes();
+ String keyOwner;
+ for (int i = 0; i < nodes.length; i++)
+ {
+ keyOwner = nodes[i].getName();
+ if (locallyReplicated && nodeName.equals(keyOwner))
+ {
+ rtn.add(this.nodeName);
+ continue;
+ }
+
+ if (keys.contains(keyOwner))
+ rtn.add(keyOwner);
+ }
+ }
+
return rtn;
}
@@ -494,7 +587,7 @@
// DistributedReplicantManager cluster callbacks ----------------------------------------------
/**
- * cluster callback called when a new replicant is added on another node
+ * Cluster callback called when a new replicant is added on another node
* @param key Replicant key
* @param nodeName Node that add the current replicant
* @param replicant Serialized representation of the replicant
@@ -507,7 +600,11 @@
try
{
addReplicant(key, nodeName, replicant);
- notifyKeyListeners(key, lookupReplicants(key));
+ // Notify listeners asynchronously
+ KeyChangeEvent kce = new KeyChangeEvent();
+ kce.key = key;
+ kce.replicants = lookupReplicants(key);
+ asynchHandler.queueEvent(kce);
}
catch (Exception ex)
{
@@ -516,7 +613,7 @@
}
/**
- * cluster callback called when a replicant is removed by another node
+ * Cluster callback called when a replicant is removed by another node
* @param key Name of the replicant key
* @param nodeName Node that wants to remove its replicant for the give key
*/
@@ -524,8 +621,13 @@
{
try
{
- if (removeReplicant (key, nodeName))
- notifyKeyListeners(key, lookupReplicants(key));
+ if (removeReplicant (key, nodeName)) {
+ // Notify listeners asynchronously
+ KeyChangeEvent kce = new KeyChangeEvent();
+ kce.key = key;
+ kce.replicants = lookupReplicants(key);
+ asynchHandler.queueEvent(kce);
+ }
}
catch (Exception ex)
{
@@ -694,33 +796,40 @@
* Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
* @param key The replicant key name
* @param newReplicants The new list of replicants
+ *
*/
protected void notifyKeyListeners(String key, List newReplicants)
{
if( trace )
log.trace("notifyKeyListeners");
- synchronized(keyListeners)
+
+ // we first update the intra-view id for this particular key
+ //
+ int newId = updateReplicantsHashId (key);
+
+ ArrayList listeners = (ArrayList)keyListeners.get(key);
+ if (listeners == null)
{
- // we first update the intra-view id for this particular key
- //
- int newId = updateReplicantsHashId (key);
-
- ArrayList listeners = (ArrayList)keyListeners.get(key);
- if (listeners == null)
- {
- if( trace )
- log.trace("listeners is null");
- return;
- }
-
if( trace )
- log.trace("notifying " + listeners.size() + " listeners for key change: " + key);
- for (int i = 0; i < listeners.size(); i++)
- {
- DistributedReplicantManager.ReplicantListener listener = (DistributedReplicantManager.ReplicantListener)listeners.get(i);
- listener.replicantsChanged(key, newReplicants, newId);
- }
+ log.trace("listeners is null");
+ return;
}
+
+ // ArrayList's iterator is not thread safe
+ DistributedReplicantManager.ReplicantListener[] toNotify = null;
+ synchronized(listeners)
+ {
+ toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
+ toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
+ }
+
+ if( trace )
+ log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
+ for (int i = 0; i < toNotify.length; i++)
+ {
+ if (toNotify[i] != null)
+ toNotify[i].replicantsChanged(key, newReplicants, newId);
+ }
}
protected void republishLocalReplicants()
@@ -742,14 +851,16 @@
Map.Entry entry = (Map.Entry) entries.next();
String key = (String) entry.getKey();
Object replicant = entry.getValue();
+ if (replicant != null)
+ {
+ if( trace )
+ log.trace("publishing, key=" + key + ", value=" + replicant);
- if( trace )
- log.trace("publishing, key=" + key + ", value=" + replicant);
+ Object[] args = {key, this.nodeName, replicant};
- Object[] args = {key, this.nodeName, replicant};
-
- partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
- notifyKeyListeners(key, lookupReplicants(key));
+ partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
+ notifyKeyListeners(key, lookupReplicants(key));
+ }
}
if( trace )
log.trace("End Re-Publish local replicants");
@@ -781,14 +892,21 @@
}
for (int i = 0; i < rsp.size(); i++)
{
- Object[] objs = (Object[])rsp.get(i);
- if (objs == null)
+ Object o = rsp.get(i);
+ if (o == null)
{
log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
continue;
}
+ else if (o instanceof Throwable)
+ {
+ log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
+ continue;
+ }
+
+ Object[] objs = (Object[]) o;
String node = (String)objs[0];
- HashMap replicants = (HashMap)objs[1];
+ Map replicants = (Map)objs[1];
Iterator keys = replicants.keySet().iterator();
//FIXME: We don't remove keys in the merge process but only add new keys!
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -25,8 +25,6 @@
import javax.naming.StringRefAddr;
import javax.management.MBeanServer;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.View;
@@ -36,15 +34,12 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
import org.jboss.invocation.MarshalledValueInputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.naming.NonSerializableFactory;
@@ -63,7 +58,7 @@
public class HAPartitionImpl
extends org.jgroups.blocks.RpcDispatcher
implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
- HAPartition
+ HAPartition, AsynchEventHandler.AsynchEventProcessor
{
private static class NoHandlerForRPC implements Serializable
{
@@ -78,14 +73,14 @@
protected HashMap rpcHandlers = new HashMap();
protected HashMap stateHandlers = new HashMap();
- /** The HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList listeners = new ArrayList();
+ /** Do we send any membership change notifications synchronously? */
+ protected boolean allowSyncListeners = false;
+ /** The synch HAMembershipListener and HAMembershipExtendedListeners */
+ protected ArrayList synchListeners = new ArrayList();
/** The asynch HAMembershipListener and HAMembershipExtendedListeners */
protected ArrayList asynchListeners = new ArrayList();
- /** The LinkedQueue<ViewChangeEvent> of changes to notify asynch listeners of */
- protected LinkedQueue asynchViewChanges = new LinkedQueue();
- /** The Thread used to send membership change notifications asynchronously */
- protected Thread asynchNotifyThread;
+ /** The handler used to send membership change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
/** The current cluster partition members */
protected Vector members = null;
protected Vector jgmembers = null;
@@ -215,6 +210,8 @@
this.dsManager.init();
log.debug("bind distributed state service");
+ // Create the asynchronous handler for view changes
+ asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
// Bind ourself in the public JNDI space
//
@@ -271,10 +268,8 @@
this.replicantManager.start();
this.dsManager.start();
- // Create the asynch listener handler thread
- AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
- asynchNotifyThread = new Thread(asynchHandler, "AsynchHAMembershipListener Thread");
- asynchNotifyThread.start();
+ // Start the asynch listener handler thread
+ asynchHandler.start();
}
public void closePartition() throws Exception
@@ -284,11 +279,11 @@
try
{
- asynchNotifyThread.interrupt();
+ asynchHandler.stop();
}
catch( Exception e)
{
- log.warn("Failed to interrupte asynchNotifyThread", e);
+ log.warn("Failed to stop asynchHandler", e);
}
// Stop the DRM and DS services
@@ -556,11 +551,14 @@
log.debug("membership changed from " + this.members.size() + " to "
+ event.allMembers.size());
// Put the view change to the asynch queue
- this.asynchViewChanges.put(event);
+ this.asynchHandler.queueEvent(event);
// Broadcast the new view to the synchronous view change listeners
- this.notifyListeners(listeners, event.viewId, event.allMembers,
- event.deadMembers, event.newMembers, event.originatingGroups);
+ if (this.allowSyncListeners)
+ {
+ this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+ event.deadMembers, event.newMembers, event.originatingGroups);
+ }
}
catch (Exception ex)
{
@@ -899,20 +897,48 @@
//
public void registerMembershipListener(HAMembershipListener listener)
{
- synchronized(this.listeners)
- {
- this.listeners.add(listener);
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.add(listener);
+ }
}
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.add(listener);
+ }
+ }
}
public void unregisterMembershipListener(HAMembershipListener listener)
{
- synchronized(this.listeners)
- {
- this.listeners.remove(listener);
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.add(listener);
+ }
}
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.add(listener);
+ }
+ }
}
+ public boolean getAllowSynchronousMembershipNotifications()
+ {
+ return allowSyncListeners;
+ }
+
+ public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+ {
+ this.allowSyncListeners = allowSync;
+ }
+
// org.jgroups.RpcDispatcher overrides ---------------------------------------------------
/**
@@ -998,10 +1024,21 @@
log.trace("rpc call threw exception", t);
retval = t;
}
-
+
return retval;
}
+
+ // AsynchEventHandler.AsynchEventProcessor -----------------------
+ public void processEvent(Object event)
+ {
+ ViewChangeEvent vce = (ViewChangeEvent) event;
+ notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+ vce.deadMembers, vce.newMembers, vce.originatingGroups);
+
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1101,31 +1138,35 @@
Vector originatingGroups)
{
log.debug("Begin notifyListeners, viewID: "+viewID);
+
synchronized(theListeners)
{
- for (int i = 0; i < theListeners.size(); i++)
+ // JBAS-3619 -- don't hold synch lock while notifying
+ theListeners = (ArrayList) theListeners.clone();
+ }
+
+ for (int i = 0; i < theListeners.size(); i++)
+ {
+ HAMembershipListener aListener = null;
+ try
{
- HAMembershipListener aListener = null;
- try
+ aListener = (HAMembershipListener) theListeners.get(i);
+ if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
{
- aListener = (HAMembershipListener) theListeners.get(i);
- if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
- {
- HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
- exListener.membershipChangedDuringMerge (deadMembers, newMembers,
- allMembers, originatingGroups);
- }
- else
- {
- aListener.membershipChanged(deadMembers, newMembers, allMembers);
- }
+ HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
+ exListener.membershipChangedDuringMerge (deadMembers, newMembers,
+ allMembers, originatingGroups);
}
- catch (Throwable e)
+ else
{
- // a problem in a listener should not prevent other members to receive the new view
- log.warn("HAMembershipListener callback failure: "+aListener, e);
+ aListener.membershipChanged(deadMembers, newMembers, allMembers);
}
}
+ catch (Throwable e)
+ {
+ // a problem in a listener should not prevent other members to receive the new view
+ log.warn("HAMembershipListener callback failure: "+aListener, e);
+ }
}
log.debug("End notifyListeners, viewID: "+viewID);
}
@@ -1154,7 +1195,7 @@
catch (Exception ignored){}
}
- /** A simply data class containing the view change event needed to
+ /** A simple data class containing the view change event needed to
* message the HAMembershipListeners
*/
private static class ViewChangeEvent
@@ -1166,31 +1207,6 @@
Vector originatingGroups;
}
- /** The Runnable that handles the asynchronous listener notifications
- */
- private class AsynchViewChangeHandler implements Runnable
- {
- public void run()
- {
- log.debug("Begin AsynchViewChangeHandler");
- while( true )
- {
- try
- {
- ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take();
- notifyListeners(asynchListeners, event.viewId, event.allMembers,
- event.deadMembers, event.newMembers, event.originatingGroups);
- }
- catch(InterruptedException e)
- {
- log.debug("AsynchViewChangeHandler interrupted", e);
- break;
- }
- }
- log.debug("End AsynchViewChangeHandler");
- }
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/ha/jmx/HAServiceMBeanSupport.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -187,6 +187,8 @@
// register to listen to topology changes, which might cause the election of a new master
drmListener = new DistributedReplicantManager.ReplicantListener()
{
+ Object mutex = new Object();
+
public void replicantsChanged(
String key,
List newReplicants,
@@ -194,8 +196,16 @@
{
if (key.equals(getServiceHAName()))
{
- // change in the topology callback
- HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+ // This synchronized block was added when the internal behavior of
+ // DistributedReplicantManagerImpl was changed so that concurrent
+ // replicantsChanged notifications are possible. Synchronization
+ // ensures that this change won't break non-thread-safe
+ // subclasses of HAServiceMBeanSupport.
+ synchronized(mutex)
+ {
+ // change in the topology callback
+ HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
+ }
}
}
};
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/ejb/ProxyFactoryHA.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -180,7 +180,11 @@
}
}
- public void replicantsChanged (String key, List newReplicants, int newReplicantsViewId)
+ // synchronized keyword added when it became possible for DRM to issue
+ // concurrent replicantsChanged notifications. JBAS-2169.
+ public synchronized void replicantsChanged (String key,
+ List newReplicants,
+ int newReplicantsViewId)
{
try
{
Modified: branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java
===================================================================
--- branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java 2006-11-04 03:55:08 UTC (rev 58103)
+++ branches/JBoss_4_0_3_SP1_JBAS-3822/cluster/src/main/org/jboss/proxy/generic/ProxyFactoryHA.java 2006-11-04 03:55:53 UTC (rev 58104)
@@ -158,7 +158,11 @@
}
}
- public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId)
+ // synchronized keyword added when it became possible for DRM to issue
+ // concurrent replicantsChanged notifications. JBAS-2169.
+ public synchronized void replicantsChanged(String key,
+ List newReplicants,
+ int newReplicantsViewId)
{
try
{
More information about the jboss-cvs-commits
mailing list