[jboss-cvs] JBossAS SVN: r106438 - in projects/cluster/ha-server-core/trunk: src/main/java/org/jboss/ha/core/framework/server and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 5 23:08:31 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-07-05 23:08:31 -0400 (Mon, 05 Jul 2010)
New Revision: 106438
Added:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java
Removed:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java
Modified:
projects/cluster/ha-server-core/trunk/pom.xml
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
Log:
Update to JGroups 2.10.0.CR1; remove code that's now in JGroups
Modified: projects/cluster/ha-server-core/trunk/pom.xml
===================================================================
--- projects/cluster/ha-server-core/trunk/pom.xml 2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/pom.xml 2010-07-06 03:08:31 UTC (rev 106438)
@@ -35,7 +35,7 @@
<version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
<version.jboss.logging>3.0.0.Beta2</version.jboss.logging>
<version.infinispan>4.0.0.GA</version.infinispan>
- <version.jgroups>2.10.0.Beta2</version.jgroups>
+ <version.jgroups>2.10.0.CR1</version.jgroups>
<version.junit>3.8.1</version.junit>
</properties>
@@ -98,6 +98,12 @@
<groupId>org.jboss.cluster</groupId>
<artifactId>jboss-ha-server-cache-spi</artifactId>
<version>${version.jboss.ha.server.cache.spi}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -113,16 +119,10 @@
</dependency>
<dependency>
- <groupId>jgroups</groupId>
+ <groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>${version.jgroups}</version>
<optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- Test dependencies -->
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-07-06 03:08:31 UTC (rev 106438)
@@ -48,7 +48,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-import org.jboss.ha.core.jgroups.blocks.mux.MuxRequestCorrelator;
+import org.jboss.ha.core.jgroups.blocks.mux.DelegatingStateTransferUpHandler;
+import org.jboss.ha.core.jgroups.blocks.mux.MuxUpHandler;
import org.jboss.ha.core.jgroups.blocks.mux.StateTransferFilter;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.GroupCommunicationService;
@@ -81,11 +82,9 @@
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RequestCorrelator;
-import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.blocks.mux.MuxUpHandler;
+import org.jgroups.blocks.mux.MuxRpcDispatcher;
import org.jgroups.blocks.mux.Muxer;
import org.jgroups.blocks.mux.NoMuxHandler;
import org.jgroups.stack.IpAddress;
@@ -1130,7 +1129,7 @@
// Subscribe to events generated by the channel
MembershipListener meml = new MembershipListenerImpl();
MessageListener msgl = this.stateIdPrefix == null ? null : new MessageListenerImpl();
- this.dispatcher = new RpcHandler(this.scopeId.shortValue(), this.channel, msgl, meml, new Object(), new RequestMarshallerImpl(), new ResponseMarshallerImpl());
+ this.dispatcher = new RpcHandler(this.scopeId.shortValue(), this.channel, msgl, meml, new RequestMarshallerImpl(), new ResponseMarshallerImpl());
if (!this.channel.isConnected())
{
@@ -1790,18 +1789,16 @@
* Overrides RpcDispatcher.Handle so that we can dispatch to many
* different objects.
*/
- private class RpcHandler extends RpcDispatcher implements StateTransferFilter
+ private class RpcHandler extends MuxRpcDispatcher implements StateTransferFilter
{
- private final short scopeId;
private RpcHandler(short scopeId, Channel channel, MessageListener messageListener,
- MembershipListener membershipListener, Object serverObject,
+ MembershipListener membershipListener,
Marshaller reqMarshaller, Marshaller rspMarshaller)
{
- this.scopeId = scopeId;
+ super(scopeId);
setMessageListener(messageListener);
setMembershipListener(membershipListener);
- setServerObject(serverObject);
setRequestMarshaller(reqMarshaller);
setResponseMarshaller(rspMarshaller);
setChannel(channel);
@@ -1963,11 +1960,17 @@
@Override
public void start() {
super.start();
- // Replace the handler again! FIXME get this in superclass
+ // Replace the handler again! TODO get this in superclass
Muxer<UpHandler> muxer = this.getMuxer();
if (muxer != null) {
muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
}
+ else
+ {
+ muxer = new MuxUpHandler(this.channel.getUpHandler());
+ muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
+ this.channel.setUpHandler((UpHandler) muxer);
+ }
}
@Override
@@ -1984,17 +1987,10 @@
return stateId != null && stateId.startsWith(CoreGroupCommunicationService.this.stateIdPrefix );
}
- @Override
- protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) {
- // We can't set the scope of the request correlator here
- // since this method is called from start() triggered in the
- // MessageDispatcher constructor, when this.scope is not yet defined
- return new MuxRequestCorrelator(scopeId, transport, handler, localAddr);
- }
-
+ @SuppressWarnings("unchecked")
private Muxer<UpHandler> getMuxer() {
UpHandler handler = channel.getUpHandler();
- return ((handler != null) && (handler instanceof MuxUpHandler)) ? (MuxUpHandler) handler : null;
+ return ((handler != null) && (handler instanceof Muxer<?>)) ? (Muxer<UpHandler>) handler : null;
}
}
Deleted: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java 2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java 2010-07-06 03:08:31 UTC (rev 106438)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.core.framework.server;
-
-import org.jboss.ha.core.jgroups.blocks.mux.StateTransferFilter;
-import org.jgroups.Event;
-import org.jgroups.UpHandler;
-
-/**
- *
- *
- * @author Brian Stansberry
- *
- * @version $Revision$
- */
-public class DelegatingStateTransferUpHandler implements StateTransferFilter, UpHandler
-{
- private final UpHandler delegate;
- private final StateTransferFilter filter;
-
- public DelegatingStateTransferUpHandler(UpHandler delegate, StateTransferFilter filter)
- {
- this.delegate = delegate;
- this.filter = filter;
- }
-
- public Object up(Event evt)
- {
- return delegate.up(evt);
- }
-
- public boolean accepts(String stateId)
- {
- return filter.accepts(stateId);
- }
-
-}
Copied: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java (from rev 106190, projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DelegatingStateTransferUpHandler.java)
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java (rev 0)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java 2010-07-06 03:08:31 UTC (rev 106438)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.core.jgroups.blocks.mux;
+
+import org.jgroups.Event;
+import org.jgroups.UpHandler;
+
+/**
+ * An UpHandler that integrates another UpHandler and a StateTransferFilter.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class DelegatingStateTransferUpHandler implements StateTransferFilter, UpHandler
+{
+ private final UpHandler delegate;
+ private final StateTransferFilter filter;
+
+ /**
+ * Create a new DelegatingStateTransferUpHandler.
+ *
+ * @param delegate the UpHandler to delegate to
+ * @param filter the StateTransferFilter to delegate to
+ */
+ public DelegatingStateTransferUpHandler(UpHandler delegate, StateTransferFilter filter)
+ {
+ assert delegate != null : "delegate is null";
+ assert filter != null : "filter is null";
+
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ /**
+ * Passes the event to the delegate UpHandler.
+ *
+ * {@inheritDoc}
+ */
+ public Object up(Event evt)
+ {
+ return delegate.up(evt);
+ }
+
+ /**
+ * Checks with the delegate StateTransferFilter.
+ *
+ * {@inheritDoc}
+ */
+ public boolean accepts(String stateId)
+ {
+ return filter.accepts(stateId);
+ }
+
+}
Deleted: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java 2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxRequestCorrelator.java 2010-07-06 03:08:31 UTC (rev 106438)
@@ -1,50 +0,0 @@
-package org.jboss.ha.core.jgroups.blocks.mux;
-
-import java.util.Collection;
-
-import org.jgroups.Address;
-import org.jgroups.Message;
-import org.jgroups.blocks.RequestCorrelator;
-import org.jgroups.blocks.RequestHandler;
-import org.jgroups.blocks.RspCollector;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.mux.MuxHeader;
-import org.jgroups.conf.ClassConfigurator;
-
-/**
- * A request correlator that adds a mux header to incoming and outgoing messages.
- * @author Bela Ban
- * @author Paul Ferraro
- * @version $Id: MuxRequestCorrelator.java,v 1.3 2010/04/21 10:54:07 belaban Exp $
- */
-public class MuxRequestCorrelator extends RequestCorrelator {
-
- protected final static short MUX_ID = ClassConfigurator.getProtocolId(org.jgroups.blocks.mux.MuxRequestCorrelator.class);
- private final org.jgroups.Header header;
-
- public MuxRequestCorrelator(short id, Object transport, RequestHandler handler, Address localAddr) {
- super(ClassConfigurator.getProtocolId(RequestCorrelator.class), transport, handler, localAddr);
- this.header = new MuxHeader(id);
- }
-
- @Override
- public void sendUnicastRequest(long id, Address target, Message msg, RspCollector coll) throws Exception
- {
- msg.putHeader(MUX_ID, header);
- super.sendUnicastRequest(id, target, msg, coll);
- }
-
-
-
- @Override
- public void sendRequest(long requestId, Collection<Address> dest_mbrs, Message msg, RspCollector coll, RequestOptions options) throws Exception {
- msg.putHeader(MUX_ID, header);
- super.sendRequest(requestId, dest_mbrs, msg, coll, options);
- }
-
- @Override
- protected void prepareResponse(Message rsp) {
- rsp.putHeader(MUX_ID, header);
- super.prepareResponse(rsp);
- }
-}
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-07-05 23:06:01 UTC (rev 106437)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-07-06 03:08:31 UTC (rev 106438)
@@ -2,43 +2,31 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.jgroups.Event;
-import org.jgroups.Message;
import org.jgroups.UpHandler;
import org.jgroups.blocks.mux.MuxHeader;
-import org.jgroups.blocks.mux.MuxRequestCorrelator;
-import org.jgroups.blocks.mux.NoMuxHandler;
-import org.jgroups.conf.ClassConfigurator;
-import org.jgroups.logging.Log;
-import org.jgroups.logging.LogFactory;
import org.jgroups.stack.StateTransferInfo;
/**
- * Allows up handler multiplexing.
+ * Overrides superclass to allow state transfer multiplexing.
*
- * @author Bela Ban
- * @author Paul Ferraro
* @author Brian Stansberry
*
* @version $Id: MuxUpHandler.java,v 1.2 2010/04/15 20:05:22 ferraro Exp $
*/
public class MuxUpHandler
extends org.jgroups.blocks.mux.MuxUpHandler {
- //implements UpHandler, Muxer<UpHandler> {
- protected final Log log=LogFactory.getLog(getClass());
- protected final static short MUX_ID = ClassConfigurator.getProtocolId(MuxRequestCorrelator.class);
- private final Map<Short, UpHandler> handlers = new ConcurrentHashMap<Short, UpHandler>();
- private volatile UpHandler defaultHandler;
- private volatile Event lastFlushEvent;
- private final Object flushMutex = new Object();
+
+ private final Map<Short, UpHandler> stateTransferHandlers = new ConcurrentHashMap<Short, UpHandler>();
/**
* Creates a multiplexing up handler, with no default handler.
*/
public MuxUpHandler() {
- this.defaultHandler = null;
+ super();
}
/**
@@ -46,7 +34,7 @@
* @param defaultHandler a default up handler to handle messages with no {@link MuxHeader}
*/
public MuxUpHandler(UpHandler defaultHandler) {
- this.defaultHandler = defaultHandler;
+ super(defaultHandler);
}
/**
@@ -55,13 +43,10 @@
*/
@Override
public void add(short id, UpHandler handler) {
- synchronized (flushMutex)
- {
- if (lastFlushEvent != null)
- {
- handler.up(lastFlushEvent);
- }
- handlers.put(id, handler);
+ super.add(id, handler);
+ if (handler instanceof StateTransferFilter)
+ {
+ stateTransferHandlers.put(id, handler);
}
}
@@ -71,7 +56,8 @@
*/
@Override
public void remove(short id) {
- handlers.remove(id);
+ super.remove(id);
+ stateTransferHandlers.remove(id);
}
/**
@@ -79,89 +65,20 @@
* @see org.jgroups.UpHandler#up(org.jgroups.Event)
*/
@Override
- public Object up(Event evt) {
+ protected AtomicReference<Object> handleStateTransferEvent(Event evt) {
- switch (evt.getType()) {
- case Event.MSG: {
- Message msg = (Message) evt.getArg();
- MuxHeader hdr = (MuxHeader) msg.getHeader(MUX_ID);
- if (hdr != null) {
- short id = hdr.getId();
- UpHandler handler = handlers.get(id);
- return (handler != null) ? handler.up(evt) : new NoMuxHandler(id);
- }
- break;
- }
- case Event.GET_APPLSTATE:
- case Event.GET_STATE_OK:
- case Event.STATE_TRANSFER_OUTPUTSTREAM:
- case Event.STATE_TRANSFER_INPUTSTREAM: {
- StateTransferInfo info=(StateTransferInfo)evt.getArg();
- String state_id=info.state_id;
- UpHandler basicHandler = null;
- boolean multipleBasic = false;
- for (UpHandler uh: handlers.values())
- {
- if (uh instanceof StateTransferFilter)
- {
- if (((StateTransferFilter) uh).accepts(state_id))
- {
- return (uh.up(evt));
- }
- }
- else if (basicHandler == null)
- {
- basicHandler = uh;
- }
- else
- {
- multipleBasic = true;
- }
- }
-
- if (basicHandler != null)
- {
- if (multipleBasic)
- {
- // TODO throw exception
- log.warn("Received state transfer related event with more " +
- "than one basic UpHandler registered. Arbitrarily " +
- "picking a handler to handle request");
- }
-
- return basicHandler.up(evt);
- }
- // else let default handler handle it below
- break;
- }
- case Event.BLOCK:
- case Event.UNBLOCK: {
- synchronized (flushMutex)
- {
- this.lastFlushEvent = evt;
- passToAllHandlers(evt);
- break;
- }
- }
- case Event.VIEW_CHANGE:
- case Event.SET_LOCAL_ADDRESS:
- case Event.SUSPECT: {
- passToAllHandlers(evt);
- break;
- }
- default: {
- passToAllHandlers(evt);
- break;
- }
- }
+ StateTransferInfo info=(StateTransferInfo)evt.getArg();
+ for (UpHandler uh: stateTransferHandlers.values())
+ {
+ if (uh instanceof StateTransferFilter)
+ {
+ if (((StateTransferFilter) uh).accepts(info.state_id))
+ {
+ return new AtomicReference<Object>(uh.up(evt));
+ }
+ }
+ }
- return (defaultHandler != null) ? defaultHandler.up(evt) : null;
+ return null;
}
-
- private void passToAllHandlers(Event evt)
- {
- for (UpHandler handler: handlers.values()) {
- handler.up(evt);
- }
- }
}
More information about the jboss-cvs-commits
mailing list