[jboss-cvs] JBossAS SVN: r105442 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 31 22:57:00 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-31 22:56:59 -0400 (Mon, 31 May 2010)
New Revision: 105442
Modified:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
Log:
Improve state transfer handling
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-06-01 01:24:53 UTC (rev 105441)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-06-01 02:56:59 UTC (rev 105442)
@@ -1,18 +1,13 @@
package org.jboss.ha.core.jgroups.blocks.mux;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.UpHandler;
import org.jgroups.blocks.mux.MuxHeader;
import org.jgroups.blocks.mux.MuxRequestCorrelator;
-import org.jgroups.blocks.mux.Muxer;
import org.jgroups.blocks.mux.NoMuxHandler;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
@@ -28,14 +23,16 @@
*
* @version $Id: MuxUpHandler.java,v 1.2 2010/04/15 20:05:22 ferraro Exp $
*/
-public class MuxUpHandler implements UpHandler, Muxer<UpHandler> {
+public class MuxUpHandler
+ extends org.jgroups.blocks.mux.MuxUpHandler {
+ //implements UpHandler, Muxer<UpHandler> {
protected final Log log=LogFactory.getLog(getClass());
protected final static short MUX_ID = ClassConfigurator.getProtocolId(MuxRequestCorrelator.class);
private final Map<Short, UpHandler> handlers = new ConcurrentHashMap<Short, UpHandler>();
- private final Set<StateTransferFilter> stateTransferHandlers = new CopyOnWriteArraySet<StateTransferFilter>();
- private final List<UpHandler> basicHandlers = new CopyOnWriteArrayList<UpHandler>();
private volatile UpHandler defaultHandler;
+ private volatile Event lastFlushEvent;
+ private final Object flushMutex = new Object();
/**
* Creates a multiplexing up handler, with no default handler.
@@ -58,15 +55,14 @@
*/
@Override
public void add(short id, UpHandler handler) {
- handlers.put(id, handler);
- if (handler instanceof StateTransferFilter)
- {
- stateTransferHandlers.add((StateTransferFilter) handler);
- }
- else
- {
- basicHandlers.add(handler);
- }
+ synchronized (flushMutex)
+ {
+ if (lastFlushEvent != null)
+ {
+ handler.up(lastFlushEvent);
+ }
+ handlers.put(id, handler);
+ }
}
/**
@@ -75,15 +71,7 @@
*/
@Override
public void remove(short id) {
- UpHandler handler = handlers.remove(id);
- if (handler instanceof StateTransferFilter)
- {
- stateTransferHandlers.remove(handler);
- }
- else
- {
- basicHandlers.remove(handler);
- }
+ handlers.remove(id);
}
/**
@@ -92,7 +80,8 @@
*/
@Override
public Object up(Event evt) {
- switch (evt.getType()) {
+
+ switch (evt.getType()) {
case Event.MSG: {
Message msg = (Message) evt.getArg();
MuxHeader hdr = (MuxHeader) msg.getHeader(MUX_ID);
@@ -109,49 +98,70 @@
case Event.STATE_TRANSFER_INPUTSTREAM: {
StateTransferInfo info=(StateTransferInfo)evt.getArg();
String state_id=info.state_id;
- for (StateTransferFilter stup : stateTransferHandlers)
+ UpHandler basicHandler = null;
+ boolean multipleBasic = false;
+ for (UpHandler uh: handlers.values())
{
- if (stup.accepts(state_id))
+ if (uh instanceof StateTransferFilter)
{
- return ((UpHandler) stup).up(evt);
+ if (((StateTransferFilter) uh).accepts(state_id))
+ {
+ return (uh.up(evt));
+ }
}
+ else if (basicHandler == null)
+ {
+ basicHandler = uh;
+ }
+ else
+ {
+ multipleBasic = true;
+ }
}
- int numBasic = basicHandlers.size();
-
- if (numBasic > 0)
+ if (basicHandler != null)
{
- if (numBasic > 1)
+ if (multipleBasic)
{
+ // TODO throw exception
log.warn("Received state transfer related event with more " +
"than one basic UpHandler registered. Arbitrarily " +
- "using first handler registered to handle request");
+ "picking a handler to handle request");
}
- try
- {
- return basicHandlers.get(0).up(evt);
- }
- catch (IndexOutOfBoundsException ignored)
- {
- // must have been removed
- }
+ return basicHandler.up(evt);
}
+ // else let default handler handle it below
break;
+ }
+ case Event.BLOCK:
+ case Event.UNBLOCK: {
+ synchronized (flushMutex)
+ {
+ this.lastFlushEvent = evt;
+ passToAllHandlers(evt);
+ break;
+ }
}
case Event.VIEW_CHANGE:
case Event.SET_LOCAL_ADDRESS:
- case Event.SUSPECT:
- case Event.BLOCK:
- case Event.UNBLOCK:
- default: {
- for (UpHandler handler: handlers.values()) {
- handler.up(evt);
- }
- return null;
- }
+ case Event.SUSPECT: {
+ passToAllHandlers(evt);
+ break;
+ }
+ default: {
+ passToAllHandlers(evt);
+ break;
+ }
}
return (defaultHandler != null) ? defaultHandler.up(evt) : null;
}
+
+ private void passToAllHandlers(Event evt)
+ {
+ for (UpHandler handler: handlers.values()) {
+ handler.up(evt);
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list