[jboss-cvs] JBossAS SVN: r106438 - in projects/cluster/ha-server-core/trunk: src/main/java/org/jboss/ha/core/framework/server and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 5 23:08:31 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-07-05 23:08:31 -0400 (Mon, 05 Jul 2010)
New Revision: 106438

Added:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java
Removed:
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java
Modified:
   projects/cluster/ha-server-core/trunk/pom.xml
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
   projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
Log:
Update to JGroups 2.10.0.CR1; remove code that's now in JGroups

Modified: projects/cluster/ha-server-core/trunk/pom.xml
===================================================================
--- projects/cluster/ha-server-core/trunk/pom.xml	2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/pom.xml	2010-07-06 03:08:31 UTC (rev 106438)
@@ -35,7 +35,7 @@
     <version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
     <version.jboss.logging>3.0.0.Beta2</version.jboss.logging>
     <version.infinispan>4.0.0.GA</version.infinispan>
-    <version.jgroups>2.10.0.Beta2</version.jgroups>
+    <version.jgroups>2.10.0.CR1</version.jgroups>
     <version.junit>3.8.1</version.junit>
   </properties>
   
@@ -98,6 +98,12 @@
       <groupId>org.jboss.cluster</groupId>
       <artifactId>jboss-ha-server-cache-spi</artifactId>
       <version>${version.jboss.ha.server.cache.spi}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>jgroups</groupId>
+          <artifactId>jgroups</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     
     <dependency>
@@ -113,16 +119,10 @@
     </dependency>
 
     <dependency>
-      <groupId>jgroups</groupId>
+      <groupId>org.jgroups</groupId>
       <artifactId>jgroups</artifactId>
       <version>${version.jgroups}</version>      
       <optional>true</optional>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency> 
     
     <!-- Test dependencies -->

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java	2010-07-06 03:08:31 UTC (rev 106438)
@@ -48,7 +48,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
-import org.jboss.ha.core.jgroups.blocks.mux.MuxRequestCorrelator;
+import org.jboss.ha.core.jgroups.blocks.mux.DelegatingStateTransferUpHandler;
+import org.jboss.ha.core.jgroups.blocks.mux.MuxUpHandler;
 import org.jboss.ha.core.jgroups.blocks.mux.StateTransferFilter;
 import org.jboss.ha.framework.interfaces.ClusterNode;
 import org.jboss.ha.framework.interfaces.GroupCommunicationService;
@@ -81,11 +82,9 @@
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RequestCorrelator;
-import org.jgroups.blocks.RequestHandler;
 import org.jgroups.blocks.RequestOptions;
 import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.blocks.mux.MuxUpHandler;
+import org.jgroups.blocks.mux.MuxRpcDispatcher;
 import org.jgroups.blocks.mux.Muxer;
 import org.jgroups.blocks.mux.NoMuxHandler;
 import org.jgroups.stack.IpAddress;
@@ -1130,7 +1129,7 @@
       // Subscribe to events generated by the channel
       MembershipListener meml = new MembershipListenerImpl();
       MessageListener msgl = this.stateIdPrefix == null ? null : new MessageListenerImpl();
-      this.dispatcher = new RpcHandler(this.scopeId.shortValue(), this.channel, msgl, meml, new Object(), new RequestMarshallerImpl(), new ResponseMarshallerImpl());
+      this.dispatcher = new RpcHandler(this.scopeId.shortValue(), this.channel, msgl, meml, new RequestMarshallerImpl(), new ResponseMarshallerImpl());
       
       if (!this.channel.isConnected())
       {
@@ -1790,18 +1789,16 @@
     * Overrides RpcDispatcher.Handle so that we can dispatch to many
     * different objects.
     */
-   private class RpcHandler extends RpcDispatcher implements StateTransferFilter
+   private class RpcHandler extends MuxRpcDispatcher implements StateTransferFilter
    {
-      private final short scopeId;
       private RpcHandler(short scopeId, Channel channel, MessageListener messageListener, 
-            MembershipListener membershipListener, Object serverObject,
+            MembershipListener membershipListener,
             Marshaller reqMarshaller, Marshaller rspMarshaller)
       {
-         this.scopeId = scopeId;
+         super(scopeId);
          
          setMessageListener(messageListener);
          setMembershipListener(membershipListener);
-         setServerObject(serverObject);
          setRequestMarshaller(reqMarshaller);
          setResponseMarshaller(rspMarshaller);
          setChannel(channel);
@@ -1963,11 +1960,17 @@
       @Override
       public void start() {
           super.start();
-          // Replace the handler again! FIXME get this in superclass
+          // Replace the handler again! TODO get this in superclass
           Muxer<UpHandler> muxer = this.getMuxer();
           if (muxer != null) {
               muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
           }
+          else
+          {
+             muxer = new MuxUpHandler(this.channel.getUpHandler());
+             muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
+             this.channel.setUpHandler((UpHandler) muxer);
+          }
       }
 
       @Override
@@ -1984,17 +1987,10 @@
          return stateId != null && stateId.startsWith(CoreGroupCommunicationService.this.stateIdPrefix );
       }
 
-      @Override
-      protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) {
-          // We can't set the scope of the request correlator here
-          // since this method is called from start() triggered in the
-          // MessageDispatcher constructor, when this.scope is not yet defined
-          return new MuxRequestCorrelator(scopeId, transport, handler, localAddr);
-      }
-
+      @SuppressWarnings("unchecked")
       private Muxer<UpHandler> getMuxer() {
           UpHandler handler = channel.getUpHandler();
-          return ((handler != null) && (handler instanceof MuxUpHandler)) ? (MuxUpHandler) handler : null;
+          return ((handler != null) && (handler instanceof Muxer<?>)) ? (Muxer<UpHandler>) handler : null;
       }
       
    }

Deleted: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java	2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java	2010-07-06 03:08:31 UTC (rev 106438)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.core.framework.server;
-
-import org.jboss.ha.core.jgroups.blocks.mux.StateTransferFilter;
-import org.jgroups.Event;
-import org.jgroups.UpHandler;
-
-/**
- *
- *
- * @author Brian Stansberry
- * 
- * @version $Revision$
- */
-public class DelegatingStateTransferUpHandler implements StateTransferFilter, UpHandler
-{
-   private final UpHandler delegate;
-   private final StateTransferFilter filter;
-   
-   public DelegatingStateTransferUpHandler(UpHandler delegate, StateTransferFilter filter)
-   {
-      this.delegate = delegate;
-      this.filter = filter;
-   }
-
-   public Object up(Event evt)
-   {
-      return delegate.up(evt);
-   }
-
-   public boolean accepts(String stateId)
-   {
-      return filter.accepts(stateId);
-   }
-
-}

Copied: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java (from rev 106190, projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java)
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java	                        (rev 0)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java	2010-07-06 03:08:31 UTC (rev 106438)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.core.jgroups.blocks.mux;
+
+import org.jgroups.Event;
+import org.jgroups.UpHandler;
+
+/**
+ * An UpHandler that integrates another UpHandler and a StateTransferFilter.
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class DelegatingStateTransferUpHandler implements StateTransferFilter, UpHandler
+{
+   private final UpHandler delegate;
+   private final StateTransferFilter filter;
+   
+   /**
+    * Create a new DelegatingStateTransferUpHandler.
+    * 
+    * @param delegate the UpHandler to delegate to
+    * @param filter the StateTransferFilter to delegate to
+    */
+   public DelegatingStateTransferUpHandler(UpHandler delegate, StateTransferFilter filter)
+   {
+      assert delegate != null : "delegate is null";
+      assert filter != null : "filter is null";
+      
+      this.delegate = delegate;
+      this.filter = filter;
+   }
+
+   /**
+    * Passes the event to the delegate UpHandler.
+    * 
+    * {@inheritDoc}
+    */
+   public Object up(Event evt)
+   {
+      return delegate.up(evt);
+   }
+
+   /**
+    * Checks with the delegate StateTransferFilter.
+    * 
+    * {@inheritDoc}
+    */
+   public boolean accepts(String stateId)
+   {
+      return filter.accepts(stateId);
+   }
+
+}

Deleted: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java	2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java	2010-07-06 03:08:31 UTC (rev 106438)
@@ -1,50 +0,0 @@
-package org.jboss.ha.core.jgroups.blocks.mux;
-
-import java.util.Collection;
-
-import org.jgroups.Address;
-import org.jgroups.Message;
-import org.jgroups.blocks.RequestCorrelator;
-import org.jgroups.blocks.RequestHandler;
-import org.jgroups.blocks.RspCollector;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.mux.MuxHeader;
-import org.jgroups.conf.ClassConfigurator;
-
-/**
- * A request correlator that adds a mux header to incoming and outgoing messages.
- * @author Bela Ban
- * @author Paul Ferraro
- * @version $Id: MuxRequestCorrelator.java,v 1.3 2010/04/21 10:54:07 belaban Exp $
- */
-public class MuxRequestCorrelator extends RequestCorrelator {
-
-    protected final static short MUX_ID = ClassConfigurator.getProtocolId(org.jgroups.blocks.mux.MuxRequestCorrelator.class);
-    private final org.jgroups.Header header;
-    
-    public MuxRequestCorrelator(short id, Object transport, RequestHandler handler, Address localAddr) {
-        super(ClassConfigurator.getProtocolId(RequestCorrelator.class), transport, handler, localAddr);
-        this.header = new MuxHeader(id);
-    }
-    
-    @Override
-   public void sendUnicastRequest(long id, Address target, Message msg, RspCollector coll) throws Exception
-   {
-       msg.putHeader(MUX_ID, header);
-      super.sendUnicastRequest(id, target, msg, coll);
-   }
-
-
-
-   @Override
-    public void sendRequest(long requestId, Collection<Address> dest_mbrs, Message msg, RspCollector coll, RequestOptions options) throws Exception {
-        msg.putHeader(MUX_ID, header);
-        super.sendRequest(requestId, dest_mbrs, msg, coll, options);
-    }
-
-    @Override
-    protected void prepareResponse(Message rsp) {
-        rsp.putHeader(MUX_ID, header);
-        super.prepareResponse(rsp);
-    }
-}

Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java	2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java	2010-07-06 03:08:31 UTC (rev 106438)
@@ -2,43 +2,31 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.jgroups.Event;
-import org.jgroups.Message;
 import org.jgroups.UpHandler;
 import org.jgroups.blocks.mux.MuxHeader;
-import org.jgroups.blocks.mux.MuxRequestCorrelator;
-import org.jgroups.blocks.mux.NoMuxHandler;
-import org.jgroups.conf.ClassConfigurator;
-import org.jgroups.logging.Log;
-import org.jgroups.logging.LogFactory;
 import org.jgroups.stack.StateTransferInfo;
 
 /**
- * Allows up handler multiplexing.
+ * Overrides superclass to allow state transfer multiplexing.
  * 
- * @author Bela Ban
- * @author Paul Ferraro
  * @author Brian Stansberry
  * 
  * @version $Id: MuxUpHandler.java,v 1.2 2010/04/15 20:05:22 ferraro Exp $
  */
 public class MuxUpHandler 
       extends org.jgroups.blocks.mux.MuxUpHandler {
-    //implements UpHandler, Muxer<UpHandler> {
 
-    protected final Log log=LogFactory.getLog(getClass());
-    protected final static short MUX_ID = ClassConfigurator.getProtocolId(MuxRequestCorrelator.class);
-    private final Map<Short, UpHandler> handlers = new ConcurrentHashMap<Short, UpHandler>();
-    private volatile UpHandler defaultHandler;
-    private volatile Event lastFlushEvent;
-    private final Object flushMutex = new Object();
+
+    private final Map<Short, UpHandler> stateTransferHandlers = new ConcurrentHashMap<Short, UpHandler>();
     
     /**
      * Creates a multiplexing up handler, with no default handler.
      */
     public MuxUpHandler() {
-        this.defaultHandler = null;
+        super();
     }
 
     /**
@@ -46,7 +34,7 @@
      * @param defaultHandler a default up handler to handle messages with no {@link MuxHeader}
      */
     public MuxUpHandler(UpHandler defaultHandler) {
-        this.defaultHandler = defaultHandler;
+        super(defaultHandler);
     }
 
     /**
@@ -55,13 +43,10 @@
      */
     @Override
     public void add(short id, UpHandler handler) {
-       synchronized (flushMutex)
-       {          
-          if (lastFlushEvent != null)
-          {
-             handler.up(lastFlushEvent);
-          }
-          handlers.put(id, handler);
+       super.add(id, handler);
+       if (handler instanceof StateTransferFilter)
+       {
+          stateTransferHandlers.put(id, handler);
        }
     }
 
@@ -71,7 +56,8 @@
      */
     @Override
     public void remove(short id) {
-        handlers.remove(id);
+       super.remove(id);
+       stateTransferHandlers.remove(id);
     }
 
     /**
@@ -79,89 +65,20 @@
      * @see org.jgroups.UpHandler#up(org.jgroups.Event)
      */
     @Override
-    public Object up(Event evt) {
+    protected AtomicReference<Object> handleStateTransferEvent(Event evt) {
         
-       switch (evt.getType()) {
-            case Event.MSG: {
-                Message msg = (Message) evt.getArg();
-                MuxHeader hdr = (MuxHeader) msg.getHeader(MUX_ID);
-                if (hdr != null) {
-                    short id = hdr.getId();
-                    UpHandler handler = handlers.get(id);
-                    return (handler != null) ? handler.up(evt) : new NoMuxHandler(id);
-                }
-                break;
-            }
-            case Event.GET_APPLSTATE:
-            case Event.GET_STATE_OK: 
-            case Event.STATE_TRANSFER_OUTPUTSTREAM: 
-            case Event.STATE_TRANSFER_INPUTSTREAM: {
-                StateTransferInfo info=(StateTransferInfo)evt.getArg();
-                String state_id=info.state_id;
-                UpHandler basicHandler = null;
-                boolean multipleBasic = false;
-                for (UpHandler uh: handlers.values())
-                {
-                   if (uh instanceof StateTransferFilter)
-                   {
-                      if (((StateTransferFilter) uh).accepts(state_id))
-                      {
-                         return (uh.up(evt));
-                      }
-                   }
-                   else if (basicHandler == null)
-                   {
-                      basicHandler = uh;
-                   }
-                   else
-                   {
-                      multipleBasic = true;
-                   }
-                }
-                
-                if (basicHandler != null)
-                {
-                   if (multipleBasic)
-                   {
-                      // TODO throw exception
-                      log.warn("Received state transfer related event with more " +
-                      		"than one basic UpHandler registered. Arbitrarily " +
-                      		"picking a handler to handle request");
-                   }
-                   
-                   return basicHandler.up(evt);
-                }
-                // else let default handler handle it below
-                break;
-            } 
-            case Event.BLOCK: 
-            case Event.UNBLOCK: {
-               synchronized (flushMutex)
-               {
-                  this.lastFlushEvent = evt;
-                  passToAllHandlers(evt);
-                  break;
-               }
-            }
-            case Event.VIEW_CHANGE:
-            case Event.SET_LOCAL_ADDRESS: 
-            case Event.SUSPECT:  {
-               passToAllHandlers(evt);
-               break;
-           }
-           default: {
-                passToAllHandlers(evt);
-                break;
-           }
-        }
+       StateTransferInfo info=(StateTransferInfo)evt.getArg();
+       for (UpHandler uh: stateTransferHandlers.values())
+       {
+          if (uh instanceof StateTransferFilter)
+          {
+             if (((StateTransferFilter) uh).accepts(info.state_id))
+             {
+                return new AtomicReference<Object>(uh.up(evt));
+             }
+          }
+       }
         
-        return (defaultHandler != null) ? defaultHandler.up(evt) : null;
+       return null;
     }
-
-   private void passToAllHandlers(Event evt)
-   {
-      for (UpHandler handler: handlers.values()) {
-           handler.up(evt);
-       }
-   }
 }



More information about the jboss-cvs-commits mailing list