[jboss-cvs] JBossAS SVN: r105442 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon May 31 22:57:00 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-05-31 22:56:59 -0400 (Mon, 31 May 2010)
New Revision: 105442

Modified:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
Log:
Improve state transfer handling

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java	2010-06-01 01:24:53 UTC (rev 105441)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java	2010-06-01 02:56:59 UTC (rev 105442)
@@ -1,18 +1,13 @@
 package org.jboss.ha.core.jgroups.blocks.mux;
 
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.jgroups.Event;
 import org.jgroups.Message;
 import org.jgroups.UpHandler;
 import org.jgroups.blocks.mux.MuxHeader;
 import org.jgroups.blocks.mux.MuxRequestCorrelator;
-import org.jgroups.blocks.mux.Muxer;
 import org.jgroups.blocks.mux.NoMuxHandler;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.logging.Log;
@@ -28,14 +23,16 @@
  * 
  * @version $Id: MuxUpHandler.java,v 1.2 2010/04/15 20:05:22 ferraro Exp $
  */
-public class MuxUpHandler implements UpHandler, Muxer<UpHandler> {
+public class MuxUpHandler 
+      extends org.jgroups.blocks.mux.MuxUpHandler {
+    //implements UpHandler, Muxer<UpHandler> {
 
     protected final Log log=LogFactory.getLog(getClass());
     protected final static short MUX_ID = ClassConfigurator.getProtocolId(MuxRequestCorrelator.class);
     private final Map<Short, UpHandler> handlers = new ConcurrentHashMap<Short, UpHandler>();
-    private final Set<StateTransferFilter> stateTransferHandlers = new CopyOnWriteArraySet<StateTransferFilter>();
-    private final List<UpHandler> basicHandlers = new CopyOnWriteArrayList<UpHandler>();
     private volatile UpHandler defaultHandler;
+    private volatile Event lastFlushEvent;
+    private final Object flushMutex = new Object();
     
     /**
      * Creates a multiplexing up handler, with no default handler.
@@ -58,15 +55,14 @@
      */
     @Override
     public void add(short id, UpHandler handler) {
-        handlers.put(id, handler);
-        if (handler instanceof StateTransferFilter)
-        {
-           stateTransferHandlers.add((StateTransferFilter) handler);
-        }
-        else
-        {
-           basicHandlers.add(handler);
-        }
+       synchronized (flushMutex)
+       {          
+          if (lastFlushEvent != null)
+          {
+             handler.up(lastFlushEvent);
+          }
+          handlers.put(id, handler);
+       }
     }
 
     /**
@@ -75,15 +71,7 @@
      */
     @Override
     public void remove(short id) {
-        UpHandler handler = handlers.remove(id);
-        if (handler instanceof StateTransferFilter)
-        {
-           stateTransferHandlers.remove(handler);
-        }
-        else
-        {
-           basicHandlers.remove(handler);
-        }
+        handlers.remove(id);
     }
 
     /**
@@ -92,7 +80,8 @@
      */
     @Override
     public Object up(Event evt) {
-        switch (evt.getType()) {
+        
+       switch (evt.getType()) {
             case Event.MSG: {
                 Message msg = (Message) evt.getArg();
                 MuxHeader hdr = (MuxHeader) msg.getHeader(MUX_ID);
@@ -109,49 +98,70 @@
             case Event.STATE_TRANSFER_INPUTSTREAM: {
                 StateTransferInfo info=(StateTransferInfo)evt.getArg();
                 String state_id=info.state_id;
-                for (StateTransferFilter stup : stateTransferHandlers)
+                UpHandler basicHandler = null;
+                boolean multipleBasic = false;
+                for (UpHandler uh: handlers.values())
                 {
-                   if (stup.accepts(state_id))
+                   if (uh instanceof StateTransferFilter)
                    {
-                      return ((UpHandler) stup).up(evt);
+                      if (((StateTransferFilter) uh).accepts(state_id))
+                      {
+                         return (uh.up(evt));
+                      }
                    }
+                   else if (basicHandler == null)
+                   {
+                      basicHandler = uh;
+                   }
+                   else
+                   {
+                      multipleBasic = true;
+                   }
                 }
                 
-                int numBasic = basicHandlers.size();
-
-                if (numBasic > 0)
+                if (basicHandler != null)
                 {
-                   if (numBasic > 1)
+                   if (multipleBasic)
                    {
+                      // TODO throw exception
                       log.warn("Received state transfer related event with more " +
                       		"than one basic UpHandler registered. Arbitrarily " +
-                      		"using first handler registered to handle request");
+                      		"picking a handler to handle request");
                    }
                    
-                   try
-                   {
-                      return basicHandlers.get(0).up(evt);
-                   }
-                   catch (IndexOutOfBoundsException ignored)
-                   {
-                      // must have been removed
-                   }
+                   return basicHandler.up(evt);
                 }
+                // else let default handler handle it below
                 break;
+            } 
+            case Event.BLOCK: 
+            case Event.UNBLOCK: {
+               synchronized (flushMutex)
+               {
+                  this.lastFlushEvent = evt;
+                  passToAllHandlers(evt);
+                  break;
+               }
             }
             case Event.VIEW_CHANGE:
             case Event.SET_LOCAL_ADDRESS: 
-            case Event.SUSPECT: 
-            case Event.BLOCK: 
-            case Event.UNBLOCK:
-            default: {
-                for (UpHandler handler: handlers.values()) {
-                    handler.up(evt);
-                }
-                return null;
-            }
+            case Event.SUSPECT:  {
+               passToAllHandlers(evt);
+               break;
+           }
+           default: {
+                passToAllHandlers(evt);
+                break;
+           }
         }
         
         return (defaultHandler != null) ? defaultHandler.up(evt) : null;
     }
+
+   private void passToAllHandlers(Event evt)
+   {
+      for (UpHandler handler: handlers.values()) {
+           handler.up(evt);
+       }
+   }
 }




More information about the jboss-cvs-commits mailing list