[exo-jcr-commits] exo-jcr SVN: r5270 - in kernel/trunk: exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl and 20 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 6 08:40:18 EST 2011
Author: nfilotto
Date: 2011-12-06 08:40:17 -0500 (Tue, 06 Dec 2011)
New Revision: 5270
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
kernel/trunk/pom.xml
Log:
EXOJCR-1672: Propose an RPCService implementation based on JGroups 3 (impl)
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1032 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.commons.utils.PropertyManager;
+import org.exoplatform.commons.utils.SecurityHelper;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RPCService;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.picocontainer.Startable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
+ * {@link MessageDispatcher} of JGroups. This implementation is not designed to give
+ * the best possible performances, it only aims to give a way to communicate with other nodes.
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public abstract class AbstractRPCService implements RPCService, Startable, RequestHandler, MembershipListener
+{
+
+ /**
+ * Connection logger.
+ */
+ private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
+
+ /**
+ * The name of the parameter for the location of the JGroups configuration.
+ */
+ protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
+
+ /**
+ * The name of the parameter for the name of the cluster.
+ */
+ protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
+
+ /**
+ * The name of the parameter for the default timeout
+ */
+ protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
+
+ /**
+ * The name of the parameter to allow the failover
+ */
+ protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
+
+ /**
+ * The name of the parameter for the retry timeout
+ */
+ protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
+
+ /**
+ * The value of the default timeout
+ */
+ protected static final int DEFAULT_TIMEOUT = 0;
+
+ /**
+ * The value of the default retry timeout
+ */
+ protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
+
+ /**
+ * The default value of the cluster name
+ */
+ protected static final String CLUSTER_NAME = "RPCService-Cluster";
+
+ /**
+ * The configurator used to create the JGroups Channel
+ */
+ protected final ProtocolStackConfigurator configurator;
+
+ /**
+ * The lock used to synchronize all the threads waiting for a topology change.
+ */
+ private final Object topologyChangeLock = new Object();
+
+ /**
+ * The name of the cluster
+ */
+ private final String clusterName;
+
+ /**
+ * The JGroups Channel used to communicate with other nodes
+ */
+ protected Channel channel;
+
+ /**
+ * The current list of all the members of the cluster
+ */
+ protected volatile List<Address> members;
+
+ /**
+ * The address of the current coordinator
+ */
+ protected volatile Address coordinator;
+
+ /**
+ * Indicates whether the current node is the coordinator of the cluster or not
+ */
+ protected volatile boolean isCoordinator;
+
+ /**
+ * The default value of the timeout
+ */
+ private long defaultTimeout = DEFAULT_TIMEOUT;
+
+ /**
+ * The value of the retry timeout
+ */
+ private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
+
+ /**
+ * Indicates whether the failover capabilities are enabled
+ */
+ private boolean allowFailover = true;
+
+ /**
+ * The dispatcher used to launch the command of the cluster nodes
+ */
+ protected MessageDispatcher dispatcher;
+
+ /**
+ * The signal that indicates that the service is started, it will be used
+ * to make the application wait until the service is fully started to
+ * ensure that all the commands have been registered before handling
+ * incoming messages.
+ */
+ private final CountDownLatch startSignal = new CountDownLatch(1);
+
+ /**
+ * All the registered {@link TopologyChangeListener}
+ */
+ private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
+
+ /**
+ * Current State of the {@link RPCServiceImpl}
+ */
+ private volatile State state;
+
+ /**
+ * All the commands that have been registered
+ */
+ private volatile Map<String, RemoteCommand> commands =
+ Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
+
+ /**
+ * The public constructor
+ * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
+ * {@link ExoContainer}
+ * @param params the list of initial parameters
+ * @param configManager the configuration manager used to get the configuration
+ * of JGroups
+ */
+ public AbstractRPCService(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+ {
+ if (params == null)
+ {
+ throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
+ }
+ final URL properties = getProperties(params, configManager);
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
+ }
+
+ try
+ {
+ this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
+ {
+ public ProtocolStackConfigurator run() throws Exception
+ {
+ return ConfiguratorFactory.getStackConfigurator(properties);
+ }
+ });
+ }
+ catch (PrivilegedActionException pae)
+ {
+ throw new RuntimeException("Cannot load the JGroups configuration from " + properties, pae.getCause());
+ }
+
+ this.clusterName = getClusterName(ctx, params);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
+ }
+ String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
+ if (sTimeout != null)
+ {
+ defaultTimeout = Integer.parseInt(sTimeout);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
+ }
+ }
+ String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
+ if (sAllowFailover != null)
+ {
+ allowFailover = Boolean.valueOf(sAllowFailover);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
+ }
+ }
+ sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
+ if (sTimeout != null)
+ {
+ retryTimeout = Integer.parseInt(sTimeout);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
+ }
+ }
+ this.state = State.INITIALIZED;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on each cluster node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
+ * the RPCException will be the corresponding response of this particular node
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ return excecuteCommand(members, command, synchronous, timeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on the coordinator only. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on the coordinator node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return the response of the coordinator.
+ * @throws RPCException in the event of problems.
+ */
+ protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ Address coordinator = this.coordinator;
+ Vector<Address> v = new Vector<Address>(1);
+ v.add(coordinator);
+ List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
+ Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
+ if (allowFailover && result instanceof MemberHasLeftException)
+ {
+ // The failover capabilities have been enabled and the coordinator seems to have left
+ if (coordinator.equals(this.coordinator))
+ {
+ synchronized(topologyChangeLock)
+ {
+ if (coordinator.equals(this.coordinator))
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("The coordinator did not change yet, we will relaunch the command after "
+ + retryTimeout + " ms or once a topology change has been detected");
+ try
+ {
+ topologyChangeLock.wait(retryTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ if (LOG.isTraceEnabled())
+ LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");
+ return executeCommandOnCoordinator(command, synchronous, timeout, args);
+ }
+ else if (result instanceof RPCException)
+ {
+ throw (RPCException)result;
+ }
+ return result;
+ }
+
+ /**
+ * Execute the command on all the nodes corresponding to the list of destinations.
+ * @param dests the list of members on which the command needs to be executed
+ * @param command the command to execute
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets
+ * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args the list of parameters
+ * @return a list of responses from all the targeted members of the cluster.
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> excecuteCommand(final List<Address> dests, RemoteCommand command,
+ final boolean synchronous, final long timeout, Serializable... args) throws RPCException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (state != State.STARTED)
+ {
+ throw new RPCException(
+ "Cannot execute any commands if the service is not started, the current state of the service is " + state);
+ }
+ final String commandId = command.getId();
+ if (commands.get(commandId) != command)
+ {
+ throw new RPCException("Command " + commandId + " unknown, please register your command first");
+ }
+ final Message msg = new Message();
+ setObject(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
+ RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
+ {
+ public RspList run()
+ {
+ try
+ {
+ return castMessage(dests, msg, synchronous, timeout);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Could not cast the message corresponding to the command " + commandId + ".", e);
+ }
+ return null;
+ }
+ });
+
+ if (LOG.isTraceEnabled())
+ LOG.trace("responses: " + rsps);
+ if (rsps == null)
+ throw new RPCException("Could not get the responses for command " + commandId + ".");
+ if (!synchronous)
+ return Collections.emptyList();// async case
+ if (LOG.isTraceEnabled())
+ {
+ LOG.trace("(" + getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
+ }
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+ for (Address dest : dests)
+ {
+ Rsp rsp = rsps.get(dest);
+ if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
+ {
+ // The corresponding member has left
+ retval.add(new MemberHasLeftException("No response for the member " + dest
+ + ", this member has probably left the cluster."));
+ }
+ else if (!rsp.wasReceived())
+ {
+ retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
+ }
+ else
+ {
+ Object value = rsp.getValue();
+ if (value instanceof RPCException)
+ {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (LOG.isTraceEnabled())
+ LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
+ }
+ retval.add(value);
+ }
+ }
+ return retval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object handle(Message msg)
+ {
+ String commandId = null;
+ try
+ {
+ // Ensure that the service is fully started before trying to execute any command
+ startSignal.await();
+ MessageBody body = (MessageBody)msg.getObject();
+ commandId = body.getCommandId();
+ if (!body.accept(getLocalAddress()))
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
+ "only and the local node is not the coordinator, the command will be ignored");
+ return null;
+ }
+ RemoteCommand command = getCommand(commandId);
+ if (command == null)
+ {
+ return new RPCException("Command " + commandId + " unkown, please register your command first");
+ }
+ Object execResult = command.execute(body.getArgs());
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
+ return execResult;
+ }
+ catch (Throwable x)
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Problems invoking command.", x);
+ return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void block()
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void suspect(Address suspectedMbr)
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void viewAccepted(View view)
+ {
+ boolean coordinatorHasChanged;
+ synchronized (topologyChangeLock)
+ {
+ this.members = getMembers(view);
+ Address currentCoordinator = coordinator;
+ this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
+ this.isCoordinator = coordinator != null && coordinator.equals(getLocalAddress());
+ coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
+ // Release all the nodes
+ topologyChangeLock.notifyAll();
+ }
+ onTopologyChange(coordinatorHasChanged);
+ }
+
+ /**
+ * Called anytime the topology has changed, this method will notify all the listeners
+ * currently registered
+ * @param coordinatorHasChanged this parameter is set to <code>true</code> if the
+ * coordinator has changed, <code>false</code> otherwise
+ */
+ private void onTopologyChange(boolean coordinatorHasChanged)
+ {
+ TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
+ for (TopologyChangeListener listener : listeners)
+ {
+ try
+ {
+ listener.onChange(event);
+ }
+ catch (Exception e)
+ {
+ LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized RemoteCommand registerCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ RemoteCommand oldCommand = tmpCommands.put(commandId, command);
+ if (oldCommand != null && PropertyManager.isDevelopping())
+ {
+ LOG.warn("A command has already been registered with the id " + commandId
+ + ", this command will be replaced with the new one");
+ }
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ return command;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized void unregisterCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ if (commands.get(commandId) != command)
+ {
+ // We prevent to remove any command that has not been registered, thus we expect that
+ // the registered instance is exactly the same instance as the one that we want to
+ // unregister
+ if (PropertyManager.isDevelopping())
+ {
+ LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
+ + " is unknown or the instance of RemoteCommand to unregister is unknown");
+ }
+ return;
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ tmpCommands.remove(commandId);
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isCoordinator() throws RPCException
+ {
+ if (state != State.STARTED)
+ {
+ throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
+ "the service is not started, the current state of the service is " + state);
+ }
+ return isCoordinator;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (listener == null)
+ {
+ return;
+ }
+ listeners.add(listener);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (listener == null)
+ {
+ return;
+ }
+ listeners.remove(listener);
+ }
+
+ /**
+ * Gives the {@link RemoteCommand} corresponding to the given id
+ * @param commandId the command id of the command to retrieve
+ * @return the corresponding {@link RemoteCommand}
+ */
+ protected RemoteCommand getCommand(String commandId)
+ {
+ return commands.get(commandId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void start()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+
+ try
+ {
+ SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
+ {
+ public Void run() throws Exception
+ {
+ channel = createChannel();
+ dispatcher = new MessageDispatcher(channel, null, AbstractRPCService.this, AbstractRPCService.this);
+ channel.connect(clusterName);
+ return null;
+ }
+ });
+ }
+ catch (PrivilegedActionException pae)
+ {
+ throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", pae.getCause());
+ }
+ finally
+ {
+ this.state = State.STARTED;
+ startSignal.countDown();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void stop()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+
+ this.state = State.STOPPED;
+ this.isCoordinator = false;
+ if (channel != null && channel.isOpen())
+ {
+ if (LOG.isInfoEnabled())
+ LOG.info("Disconnecting and closing the Channel");
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+ public Void run()
+ {
+ channel.disconnect();
+ channel.close();
+ return null;
+ }
+ });
+ channel = null;
+ }
+ if (dispatcher != null)
+ {
+ dispatcher.stop();
+ dispatcher = null;
+ }
+ }
+
+ /**
+ * Gives the value of the default timeout
+ * @return the default timeout
+ */
+ protected long getDefaultTimeout()
+ {
+ return defaultTimeout;
+ }
+
+ /**
+ * Gives the name of the cluster
+ * @return the name of the cluster
+ */
+ protected String getClusterName()
+ {
+ return clusterName;
+ }
+
+ /**
+ * Gives the value of the retry timeout
+ * @return the value of the retry timeout
+ */
+ protected long getRetryTimeout()
+ {
+ return retryTimeout;
+ }
+
+ /**
+ * Indicates whether the failover capabilities are enabled or not
+ * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
+ * otherwise
+ */
+ protected boolean isAllowFailover()
+ {
+ return allowFailover;
+ }
+
+ /**
+ * Returns the channel's own address. The result of calling this method on an unconnected
+ * channel is implementation defined (may return null). Calling this method on a closed
+ * channel returns null. Successor to {@link #getAddress()}. Addresses can be used as destination
+ * in the <code>send()</code> operation.
+ * @return The channel's address (opaque) or null if it cannot be found
+ */
+ protected abstract Address getLocalAddress();
+
+ /**
+ * Cast a message to all the given members
+ * @param dests The members to which the message is to be sent.
+ * @param msg The message to be sent to the members.
+ * @param synchronous Indicates whether the message must be sent in synchronous or asynchronous mode.
+ * @param timeout If 0: wait forever. Otherwise, wait for responses or timeout time.
+ * @return A list of responses. Each response is an <code>Object</code> and associated to its sender.
+ * @throws Exception if any error occur while casting the message
+ */
+ protected abstract RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception;
+
+ /**
+ * Create a channel
+ * @return An initialized channel
+ * @throws Exception if any error occur while creating the channel
+ */
+ protected abstract Channel createChannel() throws Exception;
+
+ /**
+ * Returns a reference to the List of members (ordered)
+ * Do NOT change this list, hence your will invalidate the view
+ * Make a copy if you have to modify it.
+ *
+ * @return a reference to the ordered list of members in this view
+ */
+ protected abstract List<Address> getMembers(View view);
+
+ /**
+ * Takes an object and uses Java serialization to generate the byte[] buffer which
+ * is set in the message.
+ */
+ protected abstract void setObject(Message m, Object o);
+
+ /**
+ * Gives the value of the {@link ValueParam} corresponding to the given key
+ * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
+ * @param parameterKey the name of the {@link ValueParam} that we are looking for
+ * @return the value if it exists, null otherwise
+ */
+ private static String getValueParam(InitParams params, String parameterKey)
+ {
+ try
+ {
+ return params.getValueParam(parameterKey).getValue().trim();
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gives the {@link URL} corresponding to the location of the JGroups configuration
+ * @param params the initial parameters from which we extract the parameter
+ * <code>PARAM_JGROUPS_CONFIG</code>
+ * @param configManager the configuration manager used to get the {@link URL} corresponding
+ * to the path given in the configuration of the RPCServiceImpl
+ * @return The {@link URL} corresponding to the location of the JGroups configuration,
+ * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
+ */
+ private static URL getProperties(InitParams params, ConfigurationManager configManager)
+ {
+ String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
+ if (configPath == null)
+ {
+ throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
+ + "' of RPCServiceImpl is mandatory");
+ }
+ URL properties;
+ try
+ {
+ properties = configManager.getResource(configPath);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
+ }
+ if (properties == null)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
+ }
+ return properties;
+ }
+
+ /**
+ * Gives the name of the cluster that will be able to support several portal containers
+ * since the name will be post fixed with "-${container-name}"
+ * @param ctx the context from which we extract the name of the container
+ * @param params the list of initial parameters from which we get the value of the parameter
+ * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
+ */
+ private static String getClusterName(ExoContainerContext ctx, InitParams params)
+ {
+ String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
+ if (clusterName == null)
+ {
+ clusterName = CLUSTER_NAME;
+ }
+ return clusterName += "-" + ctx.getName();
+ }
+
+ /**
+ * This intern class will be used to
+ */
+ public static class MessageBody implements Externalizable
+ {
+ /**
+ * The Id of the command to execute
+ */
+ private String commandId;
+
+ /**
+ * The list of parameters
+ */
+ private Serializable[] args;
+
+ /**
+ * The hash code of the expected destination
+ */
+ private int destination;
+
+ public MessageBody()
+ {
+ }
+
+ /**
+ * @param dest The destination of the message
+ * @param commandId the id of the command to execute
+ * @param args the arguments to use
+ */
+ public MessageBody(Address dest, String commandId, Serializable[] args)
+ {
+ this.commandId = commandId;
+ this.args = args;
+ this.destination = dest == null ? 0 : dest.hashCode();
+ }
+
+ public String getCommandId()
+ {
+ return commandId;
+ }
+
+ public Serializable[] getArgs()
+ {
+ return args;
+ }
+
+ /**
+ * Indicates whether or not the given message body accepts the given address
+ * @param address the address to check
+ * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
+ * <code>false</code> otherwise
+ */
+ public boolean accept(Address address)
+ {
+ return destination == 0 || destination == address.hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ boolean unicast = in.readBoolean();
+ if (unicast)
+ {
+ this.destination = in.readInt();
+ }
+ this.commandId = in.readUTF();
+ int size = in.readInt();
+ if (size == -1)
+ {
+ this.args = null;
+ }
+ else
+ {
+ this.args = new Serializable[size];
+ for (int i = 0; i < size; i++)
+ {
+ args[i] = (Serializable)in.readObject();
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ boolean unicast = destination != 0;
+ out.writeBoolean(unicast);
+ if (unicast)
+ {
+ out.writeInt(destination);
+ }
+ out.writeUTF(commandId);
+ if (args == null)
+ {
+ out.writeInt(-1);
+ }
+ else
+ {
+ out.writeInt(args.length);
+ for (int i = 0; i < args.length; i++)
+ {
+ out.writeObject(args[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * All the potential states of the {@link RPCServiceImpl}
+ */
+ public enum State
+ {
+ INITIALIZED, STARTED, STOPPED
+ }
+
+ public static class MemberHasLeftException extends RPCException
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 3558158913564367637L;
+
+ public MemberHasLeftException(String message)
+ {
+ super(message);
+ }
+ }
+}
Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -18,1036 +18,78 @@
*/
package org.exoplatform.services.rpc.impl;
-import org.exoplatform.commons.utils.PropertyManager;
-import org.exoplatform.commons.utils.SecurityHelper;
-import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.configuration.ConfigurationManager;
import org.exoplatform.container.xml.InitParams;
-import org.exoplatform.container.xml.ValueParam;
-import org.exoplatform.services.log.ExoLogger;
-import org.exoplatform.services.log.Log;
-import org.exoplatform.services.rpc.RPCException;
-import org.exoplatform.services.rpc.RPCService;
-import org.exoplatform.services.rpc.RemoteCommand;
-import org.exoplatform.services.rpc.TopologyChangeEvent;
-import org.exoplatform.services.rpc.TopologyChangeListener;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.ChannelException;
import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.MessageDispatcher;
-import org.jgroups.blocks.RequestHandler;
-import org.jgroups.conf.ConfiguratorFactory;
-import org.jgroups.conf.ProtocolStackConfigurator;
-import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.picocontainer.Startable;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Vector;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
/**
- * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
- * {@link MessageDispatcher}. This implementation is not designed to give the best possible
- * performances, it only aims to give a way to communicate with other nodes.
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 2.
*
* @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
* @version $Id$
*/
-public class RPCServiceImpl implements RPCService, Startable, RequestHandler, MembershipListener
+public class RPCServiceImpl extends AbstractRPCService
{
/**
- * Connection logger.
+ * {@inheritDoc}
*/
- private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
-
- /**
- * We use reflection for the Message.setObject method in order to remain backward compatible
- * because since JGroups 2.12 the signature has changed the expected parameter is no more a Serializable,
- * it is an Object
- */
- private static Method MESSAGE_SET_OBJECT_METHOD;
-
- static
- {
- try
- {
- MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Serializable.class);
- }
- catch (SecurityException e)
- {
- throw e;
- }
- catch (NoSuchMethodException e)
- {
- // We assume that we use JGroups 2.12 or higher
- try
- {
- MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Object.class);
- }
- catch (SecurityException e1)
- {
- throw e1;
- }
- catch (Exception e1)
- {
- throw new RuntimeException("Could not find the right Message.setObject method", e);
- }
- }
- }
-
- /**
- * The name of the parameter for the location of the JGroups configuration.
- */
- protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
-
- /**
- * The name of the parameter for the name of the cluster.
- */
- protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
-
- /**
- * The name of the parameter for the default timeout
- */
- protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
-
- /**
- * The name of the parameter to allow the failover
- */
- protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
-
- /**
- * The name of the parameter for the retry timeout
- */
- protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
-
- /**
- * The value of the default timeout
- */
- protected static final int DEFAULT_TIMEOUT = 0;
-
- /**
- * The value of the default retry timeout
- */
- protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
-
- /**
- * The default value of the cluster name
- */
- protected static final String CLUSTER_NAME = "RPCService-Cluster";
-
- /**
- * The configurator used to create the JGroups Channel
- */
- private final ProtocolStackConfigurator configurator;
-
- /**
- * The lock used to synchronize all the threads waiting for a topology change.
- */
- private final Object topologyChangeLock = new Object();
-
- /**
- * The name of the cluster
- */
- private final String clusterName;
-
- /**
- * The JGroups Channel used to communicate with other nodes
- */
- protected Channel channel;
-
- /**
- * The current list of all the members of the cluster
- */
- protected volatile Vector<Address> members;
-
- /**
- * The address of the current coordinator
- */
- protected volatile Address coordinator;
-
- /**
- * Indicates whether the current node is the coordinator of the cluster or not
- */
- protected volatile boolean isCoordinator;
-
- /**
- * The default value of the timeout
- */
- private long defaultTimeout = DEFAULT_TIMEOUT;
-
- /**
- * The value of the retry timeout
- */
- private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
-
- /**
- * Indicates whether the failover capabilities are enabled
- */
- private boolean allowFailover = true;
-
- /**
- * The dispatcher used to launch the command of the cluster nodes
- */
- private MessageDispatcher dispatcher;
-
- /**
- * The signal that indicates that the service is started, it will be used
- * to make the application wait until the service is fully started to
- * ensure that all the commands have been registered before handling
- * incoming messages.
- */
- private final CountDownLatch startSignal = new CountDownLatch(1);
-
- /**
- * All the registered {@link TopologyChangeListener}
- */
- private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
-
- /**
- * Current State of the {@link RPCServiceImpl}
- */
- private volatile State state;
-
- /**
- * All the commands that have been registered
- */
- private volatile Map<String, RemoteCommand> commands =
- Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
-
- /**
- * The public constructor
- * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
- * {@link ExoContainer}
- * @param params the list of initial parameters
- * @param configManager the configuration manager used to get the configuration
- * of JGroups
- */
public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
{
- if (params == null)
- {
- throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
- }
- final URL properties = getProperties(params, configManager);
- if (LOG.isInfoEnabled())
- {
- LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
- }
-
- try
- {
- this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
- {
- public ProtocolStackConfigurator run() throws Exception
- {
- return ConfiguratorFactory.getStackConfigurator(properties);
- }
- });
- }
- catch (PrivilegedActionException pae)
- {
- Throwable cause = pae.getCause();
- if (cause instanceof ChannelException)
- {
- throw new RuntimeException("Cannot load the JGroups configuration from " + properties, cause);
- }
- else if (cause instanceof RuntimeException)
- {
- throw (RuntimeException)cause;
- }
- else
- {
- throw new RuntimeException(cause);
- }
- }
-
- this.clusterName = getClusterName(ctx, params);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
- }
- String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
- if (sTimeout != null)
- {
- defaultTimeout = Integer.parseInt(sTimeout);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
- }
- }
- String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
- if (sAllowFailover != null)
- {
- allowFailover = Boolean.valueOf(sAllowFailover);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
- }
- }
- sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
- if (sTimeout != null)
- {
- retryTimeout = Integer.parseInt(sTimeout);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
- }
- }
- this.state = State.INITIALIZED;
+ super(ctx, params, configManager);
}
/**
* {@inheritDoc}
*/
- public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
- throws RPCException
+ protected Address getLocalAddress()
{
- return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+ return channel.getLocalAddress();
}
-
+
/**
* {@inheritDoc}
*/
- public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
- throws RPCException
+ protected RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout)
{
- return executeCommandOnAllNodesMain(command, true, timeout, args);
+ return dispatcher.castMessage(dests instanceof Vector ? (Vector<Address>)dests : new Vector<Address>(dests), msg,
+ synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, timeout);
}
-
+
/**
- * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
- * same type but with the default timeout. The command must be registered first otherwise an
- * {@link RPCException} will be thrown.
- *
- * @param command The command to execute on each cluster node
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
- * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args an array of {@link Serializable} objects corresponding to parameters of the command
- * to execute remotely
- * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
- * the RPCException will be the corresponding response of this particular node
- * @throws RPCException in the event of problems.
- */
- protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
- Serializable... args) throws RPCException
- {
- return excecuteCommand(members, command, synchronous, timeout, args);
- }
-
- /**
* {@inheritDoc}
*/
- public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
- throws RPCException
+ protected Channel createChannel() throws Exception
{
- return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+ Channel channel = new JChannel(configurator);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ return channel;
}
/**
* {@inheritDoc}
*/
- public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
- throws RPCException
+ protected List<Address> getMembers(View view)
{
- return executeCommandOnCoordinatorMain(command, true, timeout, args);
+ return view.getMembers();
}
/**
- * Executes a command on the coordinator only. This method is equivalent to the other method of the
- * same type but with the default timeout. The command must be registered first otherwise an
- * {@link RPCException} will be thrown.
- *
- * @param command The command to execute on the coordinator node
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
- * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args an array of {@link Serializable} objects corresponding to parameters of the command
- * to execute remotely
- * @return the response of the coordinator.
- * @throws RPCException in the event of problems.
- */
- protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
- Serializable... args) throws RPCException
- {
- Address coordinator = this.coordinator;
- Vector<Address> v = new Vector<Address>(1);
- v.add(coordinator);
- List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
- Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
- if (allowFailover && result instanceof MemberHasLeftException)
- {
- // The failover capabilities have been enabled and the coordinator seems to have left
- if (coordinator.equals(this.coordinator))
- {
- synchronized(topologyChangeLock)
- {
- if (coordinator.equals(this.coordinator))
- {
- if (LOG.isTraceEnabled())
- LOG.trace("The coordinator did not change yet, we will relaunch the command after "
- + retryTimeout + " ms or once a topology change has been detected");
- try
- {
- topologyChangeLock.wait(retryTimeout);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- if (LOG.isTraceEnabled())
- LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");
- return executeCommandOnCoordinator(command, synchronous, timeout, args);
- }
- else if (result instanceof RPCException)
- {
- throw (RPCException)result;
- }
- return result;
- }
-
- /**
- * Execute the command on all the nodes corresponding to the list of destinations.
- * @param dests the list of members on which the command needs to be executed
- * @param command the command to execute
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets
- * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args the list of parameters
- * @return a list of responses from all the targeted members of the cluster.
- * @throws RPCException in the event of problems.
- */
- protected List<Object> excecuteCommand(final Vector<Address> dests, RemoteCommand command,
- final boolean synchronous, final long timeout, Serializable... args) throws RPCException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (state != State.STARTED)
- {
- throw new RPCException(
- "Cannot execute any commands if the service is not started, the current state of the service is " + state);
- }
- String commandId = command.getId();
- if (commands.get(commandId) != command)
- {
- throw new RPCException("Command " + commandId + " unknown, please register your command first");
- }
- final Message msg = new Message();
- try
- {
- MESSAGE_SET_OBJECT_METHOD.invoke(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
- }
- catch (Exception e)
- {
- throw new RPCException("Could not call the method Message.setObject");
- }
- RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
- {
- public RspList run()
- {
- return dispatcher.castMessage(dests, msg, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE,
- timeout);
- }
- });
-
- if (LOG.isTraceEnabled())
- LOG.trace("responses: " + rsps);
- if (rsps == null)
- throw new RPCException("Could not get the responses for command " + commandId + ".");
- if (!synchronous)
- return Collections.emptyList();// async case
- if (LOG.isTraceEnabled())
- {
- LOG.trace("(" + channel.getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
- }
- List<Object> retval = new ArrayList<Object>(rsps.size());
- for (Address dest : dests)
- {
- Rsp rsp = rsps.get(dest);
- if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
- {
- // The corresponding member has left
- retval.add(new MemberHasLeftException("No response for the member " + dest
- + ", this member has probably left the cluster."));
- }
- else if (!rsp.wasReceived())
- {
- retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
- }
- else
- {
- Object value = rsp.getValue();
- if (value instanceof RPCException)
- {
- // if we have any application-level exceptions make sure we throw them!!
- if (LOG.isTraceEnabled())
- LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
- }
- retval.add(value);
- }
- }
- return retval;
- }
-
- /**
* {@inheritDoc}
*/
- public Object handle(Message msg)
+ protected void setObject(Message m, Object o)
{
- String commandId = null;
- try
- {
- // Ensure that the service is fully started before trying to execute any command
- startSignal.await();
- MessageBody body = (MessageBody)msg.getObject();
- commandId = body.getCommandId();
- if (!body.accept(channel.getLocalAddress()))
- {
- if (LOG.isTraceEnabled())
- LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
- "only and the local node is not the coordinator, the command will be ignored");
- return null;
- }
- RemoteCommand command = getCommand(commandId);
- if (command == null)
- {
- return new RPCException("Command " + commandId + " unkown, please register your command first");
- }
- Object execResult = command.execute(body.getArgs());
- if (LOG.isTraceEnabled())
- LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
- return execResult;
- }
- catch (Throwable x)
- {
- if (LOG.isTraceEnabled())
- LOG.trace("Problems invoking command.", x);
- return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
- }
+ m.setObject((Serializable)o);
}
-
- /**
- * {@inheritDoc}
- */
- public void block()
- {
- }
-
- /**
- * {@inheritDoc}
- */
- public void suspect(Address suspectedMbr)
- {
- }
-
- /**
- * {@inheritDoc}
- */
- public void viewAccepted(View view)
- {
- boolean coordinatorHasChanged;
- synchronized (topologyChangeLock)
- {
- this.members = view.getMembers();
- Address currentCoordinator = coordinator;
- this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
- this.isCoordinator = coordinator != null && coordinator.equals(channel.getLocalAddress());
- coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
- // Release all the nodes
- topologyChangeLock.notifyAll();
- }
- onTopologyChange(coordinatorHasChanged);
- }
-
- /**
- * Called anytime the topology has changed, this method will notify all the listeners
- * currently registered
- * @param coordinatorHasChanged this parameter is set to <code>true</code> if the
- * coordinator has changed, <code>false</code> otherwise
- */
- private void onTopologyChange(boolean coordinatorHasChanged)
- {
- TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
- for (TopologyChangeListener listener : listeners)
- {
- try
- {
- listener.onChange(event);
- }
- catch (Exception e)
- {
- LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized RemoteCommand registerCommand(RemoteCommand command)
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (command != null)
- {
- String commandId = command.getId();
- if (commandId == null)
- {
- throw new IllegalArgumentException("The command Id cannot be null");
- }
- Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
- RemoteCommand oldCommand = tmpCommands.put(commandId, command);
- if (oldCommand != null && PropertyManager.isDevelopping())
- {
- LOG.warn("A command has already been registered with the id " + commandId
- + ", this command will be replaced with the new one");
- }
- this.commands = Collections.unmodifiableMap(tmpCommands);
- return command;
- }
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized void unregisterCommand(RemoteCommand command)
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (command != null)
- {
- String commandId = command.getId();
- if (commandId == null)
- {
- throw new IllegalArgumentException("The command Id cannot be null");
- }
- if (commands.get(commandId) != command)
- {
- // We prevent to remove any command that has not been registered, thus we expect that
- // the registered instance is exactly the same instance as the one that we want to
- // unregister
- if (PropertyManager.isDevelopping())
- {
- LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
- + " is unknown or the instance of RemoteCommand to unregister is unknown");
- }
- return;
- }
- Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
- tmpCommands.remove(commandId);
- this.commands = Collections.unmodifiableMap(tmpCommands);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean isCoordinator() throws RPCException
- {
- if (state != State.STARTED)
- {
- throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
- "the service is not started, the current state of the service is " + state);
- }
- return isCoordinator;
- }
-
- /**
- * {@inheritDoc}
- */
- public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (listener == null)
- {
- return;
- }
- listeners.add(listener);
- }
-
- /**
- * {@inheritDoc}
- */
- public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (listener == null)
- {
- return;
- }
- listeners.remove(listener);
- }
-
- /**
- * Gives the {@link RemoteCommand} corresponding to the given id
- * @param commandId the command id of the command to retrieve
- * @return the corresponding {@link RemoteCommand}
- */
- protected RemoteCommand getCommand(String commandId)
- {
- return commands.get(commandId);
- }
-
- /**
- * {@inheritDoc}
- */
- public void start()
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
-
- try
- {
- SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
- {
- public Void run() throws Exception
- {
- channel = new JChannel(configurator);
- channel.setOpt(Channel.AUTO_RECONNECT, true);
- dispatcher = new MessageDispatcher(channel, null, RPCServiceImpl.this, RPCServiceImpl.this);
- channel.connect(clusterName);
- return null;
- }
- });
- }
- catch (PrivilegedActionException pae)
- {
- Throwable cause = pae.getCause();
- if (cause instanceof ChannelException)
- {
- throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", cause);
- }
- else if (cause instanceof RuntimeException)
- {
- throw (RuntimeException)cause;
- }
- else
- {
- throw new RuntimeException(cause);
- }
- }
- finally
- {
- this.state = State.STARTED;
- startSignal.countDown();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void stop()
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
-
- this.state = State.STOPPED;
- this.isCoordinator = false;
- if (channel != null && channel.isOpen())
- {
- if (LOG.isInfoEnabled())
- LOG.info("Disconnecting and closing the Channel");
- SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
- {
- public Void run()
- {
- channel.disconnect();
- channel.close();
- return null;
- }
- });
- channel = null;
- }
- if (dispatcher != null)
- {
- dispatcher.stop();
- dispatcher = null;
- }
- }
-
- /**
- * Gives the value of the default timeout
- * @return the default timeout
- */
- protected long getDefaultTimeout()
- {
- return defaultTimeout;
- }
-
- /**
- * Gives the name of the cluster
- * @return the name of the cluster
- */
- protected String getClusterName()
- {
- return clusterName;
- }
-
- /**
- * Gives the value of the retry timeout
- * @return the value of the retry timeout
- */
- protected long getRetryTimeout()
- {
- return retryTimeout;
- }
-
- /**
- * Indicates whether the failover capabilities are enabled or not
- * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
- * otherwise
- */
- protected boolean isAllowFailover()
- {
- return allowFailover;
- }
-
- /**
- * Gives the value of the {@link ValueParam} corresponding to the given key
- * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
- * @param parameterKey the name of the {@link ValueParam} that we are looking for
- * @return the value if it exists, null otherwise
- */
- private static String getValueParam(InitParams params, String parameterKey)
- {
- try
- {
- return params.getValueParam(parameterKey).getValue().trim();
- }
- catch (NullPointerException e)
- {
- return null;
- }
- }
-
- /**
- * Gives the {@link URL} corresponding to the location of the JGroups configuration
- * @param params the initial parameters from which we extract the parameter
- * <code>PARAM_JGROUPS_CONFIG</code>
- * @param configManager the configuration manager used to get the {@link URL} corresponding
- * to the path given in the configuration of the RPCServiceImpl
- * @return The {@link URL} corresponding to the location of the JGroups configuration,
- * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
- */
- private static URL getProperties(InitParams params, ConfigurationManager configManager)
- {
- String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
- if (configPath == null)
- {
- throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
- + "' of RPCServiceImpl is mandatory");
- }
- URL properties;
- try
- {
- properties = configManager.getResource(configPath);
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
- }
- if (properties == null)
- {
- throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
- }
- return properties;
- }
-
- /**
- * Gives the name of the cluster that will be able to support several portal containers
- * since the name will be post fixed with "-${container-name}"
- * @param ctx the context from which we extract the name of the container
- * @param params the list of initial parameters from which we get the value of the parameter
- * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
- */
- private static String getClusterName(ExoContainerContext ctx, InitParams params)
- {
- String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
- if (clusterName == null)
- {
- clusterName = CLUSTER_NAME;
- }
- return clusterName += "-" + ctx.getName();
- }
-
- /**
- * This intern class will be used to
- */
- public static class MessageBody implements Externalizable
- {
- /**
- * The Id of the command to execute
- */
- private String commandId;
-
- /**
- * The list of parameters
- */
- private Serializable[] args;
-
- /**
- * The hash code of the expected destination
- */
- private int destination;
-
- public MessageBody()
- {
- }
-
- /**
- * @param dest The destination of the message
- * @param commandId the id of the command to execute
- * @param args the arguments to use
- */
- public MessageBody(Address dest, String commandId, Serializable[] args)
- {
- this.commandId = commandId;
- this.args = args;
- this.destination = dest == null ? 0 : dest.hashCode();
- }
-
- public String getCommandId()
- {
- return commandId;
- }
-
- public Serializable[] getArgs()
- {
- return args;
- }
-
- /**
- * Indicates whether or not the given message body accepts the given address
- * @param address the address to check
- * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
- * <code>false</code> otherwise
- */
- public boolean accept(Address address)
- {
- return destination == 0 || destination == address.hashCode();
- }
-
- /**
- * {@inheritDoc}
- */
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
- {
- boolean unicast = in.readBoolean();
- if (unicast)
- {
- this.destination = in.readInt();
- }
- this.commandId = in.readUTF();
- int size = in.readInt();
- if (size == -1)
- {
- this.args = null;
- }
- else
- {
- this.args = new Serializable[size];
- for (int i = 0; i < size; i++)
- {
- args[i] = (Serializable)in.readObject();
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void writeExternal(ObjectOutput out) throws IOException
- {
- boolean unicast = destination != 0;
- out.writeBoolean(unicast);
- if (unicast)
- {
- out.writeInt(destination);
- }
- out.writeUTF(commandId);
- if (args == null)
- {
- out.writeInt(-1);
- }
- else
- {
- out.writeInt(args.length);
- for (int i = 0; i < args.length; i++)
- {
- out.writeObject(args[i]);
- }
- }
- }
- }
-
- /**
- * All the potential states of the {@link RPCServiceImpl}
- */
- public enum State {
- INITIALIZED, STARTED, STOPPED
- }
-
- public static class MemberHasLeftException extends RPCException
- {
-
- /**
- * The serial version UID
- */
- private static final long serialVersionUID = 3558158913564367637L;
-
- public MemberHasLeftException(String message)
- {
- super(message);
- }
- }
}
Modified: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -27,7 +27,7 @@
import org.exoplatform.services.rpc.SingleMethodCallCommand;
import org.exoplatform.services.rpc.TopologyChangeEvent;
import org.exoplatform.services.rpc.TopologyChangeListener;
-import org.exoplatform.services.rpc.impl.RPCServiceImpl.MemberHasLeftException;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
import org.exoplatform.test.BasicTestCase;
import org.jgroups.Address;
@@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -805,8 +804,8 @@
o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
assertEquals("OK", o);
- Vector<Address> allMembers = service1.members;
- Vector<Address> coordinatorOnly = new Vector<Address>(1);
+ List<Address> allMembers = service1.members;
+ List<Address> coordinatorOnly = new ArrayList<Address>(1);
coordinatorOnly.add(service1.coordinator);
final RPCServiceImpl service = service2;
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,115 @@
+<!--
+
+ Copyright (C) 2009 eXo Platform SAS.
+
+ This is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of
+ the License, or (at your option) any later version.
+
+ This software is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this software; if not, write to the Free
+ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>kernel-parent</artifactId>
+ <version>2.3.5-GA-SNAPSHOT</version>
+ </parent>
+ <artifactId>exo.kernel.component.ext.rpc.impl.jgroups.v3</artifactId>
+ <name>eXo Kernel :: RPC Service Extension :: JGroups 3 Implementation</name>
+ <description>The JGroups 3 implementation of the RPC service</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.commons.test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.component.common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>3.0.0.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>${env.MAVEN_OPTS} -Djava.security.manager=org.exoplatform.commons.test.TestSecurityManager -Djava.security.policy=${project.build.directory}/test-classes/test.policy</argLine>
+ <systemProperties>
+ <!-- We add this system property due to some incompatibility between IPv6 and
+ some JVM of Linux distributions such as Ubuntu and Fedora-->
+ <property>
+ <name>java.net.preferIPv4Stack</name>
+ <value>true</value>
+ </property>
+ <!-- Avoid the firewall -->
+ <property>
+ <name>jgroups.bind_addr</name>
+ <value>127.0.0.1</value>
+ </property>
+ <property>
+ <name>jgroups.stack</name>
+ <value>udp</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-policy</id>
+ <phase>process-test-resources</phase>
+ <configuration>
+ <tasks>
+ <echo>Creating Access Policy for tests</echo>
+ <makeurl file="${settings.localRepository}" property="localRepositoryURL" />
+ <makeurl file="${project.build.outputDirectory}" property="outputDirectoryURL" />
+ <makeurl file="${project.build.testOutputDirectory}" property="testOutputDirectoryURL" />
+ <copy todir="${project.build.testOutputDirectory}" overwrite="true">
+ <fileset dir="${project.basedir}/src/test/resources/">
+ <include name="test.policy" />
+ </fileset>
+ <filterset>
+ <filter token="MAVEN_REPO" value="${localRepositoryURL}" />
+ <filter token="MAIN_CLASSES" value="${outputDirectoryURL}" />
+ <filter token="TEST_CLASSES" value="${testOutputDirectoryURL}" />
+ </filterset>
+ </copy>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>ant</groupId>
+ <artifactId>ant-optional</artifactId>
+ <version>1.5.3-1</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.jgv3;
+
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.services.rpc.impl.AbstractRPCService;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.util.RspList;
+
+import java.util.List;
+
+/**
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 3.
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class RPCServiceImpl extends AbstractRPCService
+{
+
+ /**
+ * {@inheritDoc}
+ */
+ public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+ {
+ super(ctx, params, configManager);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected Address getLocalAddress()
+ {
+ return channel.getAddress();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected RspList<Object> castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception
+ {
+ return dispatcher.castMessage(dests, msg, new RequestOptions(synchronous ? ResponseMode.GET_ALL : ResponseMode.GET_NONE, timeout));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected Channel createChannel() throws Exception
+ {
+ return new JChannel(configurator);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void unblock()
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected List<Address> getMembers(View view)
+ {
+ return view.getMembers();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected void setObject(Message m, Object o)
+ {
+ m.setObject(o);
+ }
+}
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1281 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.SingleMethodCallCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
+import org.exoplatform.services.rpc.jgv3.RPCServiceImpl;
+import org.exoplatform.test.BasicTestCase;
+import org.jgroups.Address;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is the unit test class for the service {@link RPCServiceImpl}
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ *
+ */
+public class TestRPCServiceImpl extends BasicTestCase
+{
+ private PortalContainer container;
+ private ConfigurationManager configManager;
+
+ public void setUp() throws Exception
+ {
+ container = PortalContainer.getInstance();
+ configManager = (ConfigurationManager)container.getComponentInstanceOfType(ConfigurationManager.class);
+ }
+
+ public void testParameters()
+ {
+ InitParams params = null;
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ params = new InitParams();
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ params.addParameter(paramConf);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("fakePath");
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(RPCServiceImpl.DEFAULT_TIMEOUT, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramTimeout = new ValueParam();
+ paramTimeout.setName(RPCServiceImpl.PARAM_DEFAULT_TIMEOUT);
+ paramTimeout.setValue("fakeValue");
+ params.addParameter(paramTimeout);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a NumberFormatException since the timeout is not properly set");
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ paramTimeout.setValue("60");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramRetryTimeout = new ValueParam();
+ paramRetryTimeout.setName(RPCServiceImpl.PARAM_RETRY_TIMEOUT);
+ paramRetryTimeout.setValue("fakeValue");
+ params.addParameter(paramRetryTimeout);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a NumberFormatException since the retry timeout is not properly set");
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ paramRetryTimeout.setValue("60");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramAllowFailover = new ValueParam();
+ paramAllowFailover.setName(RPCServiceImpl.PARAM_ALLOW_FAILOVER);
+ paramAllowFailover.setValue("fakeValue");
+ params.addParameter(paramAllowFailover);
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(false, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ paramAllowFailover.setValue("TRUE");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+
+ ValueParam paramClusterName = new ValueParam();
+ paramClusterName.setName(RPCServiceImpl.PARAM_CLUSTER_NAME);
+ paramClusterName.setValue("MyName");
+ params.addParameter(paramClusterName);
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(paramClusterName.getValue() + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testStates() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ RemoteCommand foo = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "foo";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+
+ service.registerCommand(foo);
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.isCoordinator();
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ service.start();
+ assertEquals(true, service.isCoordinator());
+ service.executeCommandOnAllNodes(foo, true);
+ service.executeCommandOnAllNodes(foo, 10);
+ service.executeCommandOnCoordinator(foo, true);
+ service.executeCommandOnCoordinator(foo, 10);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ }
+
+ public void testCommands() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand fake = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2_Unregistered = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(fake2);
+ RemoteCommand Exception = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Exception";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ };
+ service.registerCommand(Exception);
+ RemoteCommand Error = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Error";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ } ;
+ service.registerCommand(Error);
+ RemoteCommand StringValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "StringValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service.registerCommand(StringValue);
+ RemoteCommand NullValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "NullValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(NullValue);
+ RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ };
+ service.registerCommand(LongTask);
+ service.start();
+ try
+ {
+ service.executeCommandOnAllNodes(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ List<Object> result;
+ result = service.executeCommandOnAllNodes(Exception, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Exception, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(Error, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Error, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertNull(o);
+ result = service.executeCommandOnAllNodes(LongTask, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(LongTask, 1000);
+ fail("We expect an RPCException due to a Replication Timeout");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, false);
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ assertNull(service.executeCommandOnCoordinator(LongTask, false));
+
+ result = service.executeCommandOnAllNodes(StringValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertEquals("OK", result.get(0));
+ o = service.executeCommandOnCoordinator(StringValue, true);
+ assertEquals("OK", o);
+ result = service.executeCommandOnAllNodes(NullValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ o = service.executeCommandOnCoordinator(NullValue, true);
+ assertNull(o);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSeveralNodes() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service1 = null, service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand CmdUnknownOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "CmdUnknownOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(CmdUnknownOnNode2);
+ RemoteCommand ExceptionOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ExceptionOnNode2);
+ RemoteCommand ErrorOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ErrorOnNode2);
+
+ RemoteCommand LongTaskOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(LongTaskOnNode2);
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(3000);
+ return "OldCoordinator";
+ }
+ });
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ });
+ RemoteCommand OK = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service2.registerCommand(OK);
+ final RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "NewCoordinator";
+ }
+ };
+ service2.registerCommand(LongTask);
+ MyListener listener1 = new MyListener();
+ service1.registerTopologyChangeListener(listener1);
+ MyListener listener2 = new MyListener();
+ service2.registerTopologyChangeListener(listener2);
+ assertFalse(listener1.coordinatorHasChanged);
+ assertFalse(listener1.isCoordinator);
+ assertEquals(0, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(0, listener2.count);
+ service1.start();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(1, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(0, listener2.count);
+ service2.start();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(2, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(1, listener2.count);
+ assertEquals(true, service1.isCoordinator());
+ assertEquals(false, service2.isCoordinator());
+ List<Object> result;
+ Object o;
+ result = service1.executeCommandOnAllNodes(CmdUnknownOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command is unknown on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(CmdUnknownOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ExceptionOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ExceptionOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ErrorOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ErrorOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(LongTaskOnNode2, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
+ assertEquals("OK", o);
+
+ List<Address> allMembers = service1.members;
+ List<Address> coordinatorOnly = new ArrayList<Address>(1);
+ coordinatorOnly.add(service1.coordinator);
+
+ final RPCServiceImpl service = service2;
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ final CountDownLatch doneSignal = new CountDownLatch(1);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertEquals("NewCoordinator", o);
+ }
+ catch (Throwable e)
+ {
+ error.set(e);
+ e.printStackTrace();
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ t.start();
+ service1.stop();
+ listener2.waitTopologyChange();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(2, listener1.count);
+ assertTrue(listener2.coordinatorHasChanged);
+ assertTrue(listener2.isCoordinator);
+ assertEquals(2, listener2.count);
+ doneSignal.await();
+ assertNull(error.get() != null ? error.get().getMessage() : "", error.get());
+ result = service2.excecuteCommand(allMembers, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ assertEquals("OK", result.get(1));
+ result = service2.excecuteCommand(coordinatorOnly, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ try
+ {
+ service1.isCoordinator();
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ assertEquals(true, service2.isCoordinator());
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
+ public void testSingleMethodCallCommand() throws Exception
+ {
+ try
+ {
+ new SingleMethodCallCommand(null, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ MyService myService = new MyService();
+ try
+ {
+ new SingleMethodCallCommand(myService, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "foo");
+ fail("we expect an NoSuchMethodException");
+ }
+ catch (NoSuchMethodException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "getPrivateName");
+ fail("we expect an IllegalArgumentException since only the public methods are allowed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand getName = service.registerCommand(new SingleMethodCallCommand(myService, "getName"));
+ RemoteCommand add = service.registerCommand(new SingleMethodCallCommand(myService, "add", int.class));
+ RemoteCommand evaluate1 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", int[].class));
+ RemoteCommand evaluate2 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", List.class));
+ RemoteCommand total1 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class));
+ RemoteCommand total2 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class, int.class));
+ RemoteCommand total3 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int[].class));
+ RemoteCommand total4 = service.registerCommand(new SingleMethodCallCommand(myService, "total", String.class, long.class, int[].class));
+ RemoteCommand testTypes1 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", String[].class));
+ RemoteCommand testTypes2 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", int[].class));
+ RemoteCommand testTypes3 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", long[].class));
+ RemoteCommand testTypes4 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", byte[].class));
+ RemoteCommand testTypes5 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", short[].class));
+ RemoteCommand testTypes6 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", char[].class));
+ RemoteCommand testTypes7 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", double[].class));
+ RemoteCommand testTypes8 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", float[].class));
+ RemoteCommand testTypes9 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", boolean[].class));
+
+ service.start();
+ List<Object> result;
+
+ assertEquals("name", service.executeCommandOnCoordinator(getName, true));
+ result = service.executeCommandOnAllNodes(getName, true);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("name", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(add, true, 10));
+ result = service.executeCommandOnAllNodes(add, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate1, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(evaluate1, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ List<Integer> values = new ArrayList<Integer>();
+ values.add(10);
+ values.add(10);
+ values.add(10);
+ values.add(30);
+ values.add(40);
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate2, true, (Serializable)values));
+ result = service.executeCommandOnAllNodes(evaluate2, true, (Serializable)values);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(total1, true, 10));
+ result = service.executeCommandOnAllNodes(total1, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(20, service.executeCommandOnCoordinator(total2, true, 10, 10));
+ result = service.executeCommandOnAllNodes(total2, true, 10, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total3, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total3, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(0, service.executeCommandOnCoordinator(total4, true, "foo", 50, null));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, null);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(0, result.get(0));
+
+ try
+ {
+ service.executeCommandOnCoordinator(total4, true, "foo", 50);
+ fail("We expect a RPCException since the list of arguments mismatch with what is expected");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since the list of arguments mismatch with what is expected", result.get(0) instanceof RPCException);
+
+ assertEquals("foo", service.executeCommandOnCoordinator(testTypes1, true, (Serializable)new String[]{"foo"}));
+ result = service.executeCommandOnAllNodes(testTypes1, true, (Serializable)new String[]{"foo"});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("foo", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(testTypes2, true, new int[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes2, true, new int[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(11L, service.executeCommandOnCoordinator(testTypes3, true, new long[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes3, true, new long[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(11L, result.get(0));
+
+ assertEquals((byte)12, service.executeCommandOnCoordinator(testTypes4, true, new byte[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes4, true, new byte[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((byte)12, result.get(0));
+
+ assertEquals((short)13, service.executeCommandOnCoordinator(testTypes5, true, new short[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes5, true, new short[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((short)13, result.get(0));
+
+ assertEquals('a', service.executeCommandOnCoordinator(testTypes6, true, new char[]{'a'}));
+ result = service.executeCommandOnAllNodes(testTypes6, true, new char[]{'a'});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals('a', result.get(0));
+
+ assertEquals(10.5, service.executeCommandOnCoordinator(testTypes7, true, new double[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes7, true, new double[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10.5, result.get(0));
+
+ assertEquals((float)11.5, service.executeCommandOnCoordinator(testTypes8, true, new float[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes8, true, new float[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((float)11.5, result.get(0));
+
+ assertEquals(true, service.executeCommandOnCoordinator(testTypes9, true, new boolean[]{true}));
+ result = service.executeCommandOnAllNodes(testTypes9, true, new boolean[]{true});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(true, result.get(0));
+
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public static class MyService
+ {
+ private int value = 0;
+
+ public int add(int i)
+ {
+ return value += i;
+ }
+
+ @SuppressWarnings("unused")
+ private String getPrivateName()
+ {
+ return "name";
+ }
+
+ public String getName()
+ {
+ return "name";
+ }
+
+ public int total(int i)
+ {
+ return i;
+ }
+
+ public int total(int i1, int i2)
+ {
+ return i1 + i2;
+ }
+
+ public int total(int... values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int total(String s, long l, int... values)
+ {
+ int total = 0;
+ if (values != null)
+ {
+ for (int i : values)
+ {
+ total += i;
+ }
+ }
+ return total;
+ }
+
+ public int evaluate(int[] values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int evaluate(List<Integer> values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public String testTypes(String... values)
+ {
+ return values[0];
+ }
+
+ public boolean testTypes(boolean... values)
+ {
+ return values[0];
+ }
+
+ public char testTypes(char... values)
+ {
+ return values[0];
+ }
+
+ public double testTypes(double... values)
+ {
+ return values[0] + 0.5;
+ }
+
+ public float testTypes(float... values)
+ {
+ return (float)(values[0] + 1.5);
+ }
+
+ public int testTypes(int... values)
+ {
+ return values[0];
+ }
+
+ public long testTypes(long... values)
+ {
+ return values[0] + 1;
+ }
+
+ public byte testTypes(byte... values)
+ {
+ return (byte)(values[0] + 2);
+ }
+
+ public short testTypes(short... values)
+ {
+ return (short)(values[0] + 3);
+ }
+ }
+
+ public void testExecOnCoordinator() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+
+ final List<Boolean> calledCommands = Collections.synchronizedList(new ArrayList<Boolean>());
+
+ RPCServiceImpl service1 = null;
+ RPCServiceImpl service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service1Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 1";
+ }
+ };
+ service1.registerCommand(service1Cmd);
+
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service2Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 2";
+ }
+ };
+ service2.registerCommand(service2Cmd);
+ // starting services
+ service1.start();
+ service2.start();
+
+ Object o = service1.executeCommandOnCoordinator(service1Cmd, true);
+ assertEquals("service 1", o);
+
+ // it should be executed once
+ assertEquals(1, calledCommands.size());
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
+ private static class MyListener implements TopologyChangeListener
+ {
+
+ private boolean coordinatorHasChanged;
+ private boolean isCoordinator;
+ private int count;
+
+ private CountDownLatch lock = new CountDownLatch(2);
+
+ /**
+ * @see org.exoplatform.services.rpc.TopologyChangeListener#onChange(org.exoplatform.services.rpc.TopologyChangeEvent)
+ */
+ public void onChange(TopologyChangeEvent event)
+ {
+ this.coordinatorHasChanged = event.isCoordinatorHasChanged();
+ this.isCoordinator = event.isCoordinator();
+ count++;
+
+ lock.countDown();
+ }
+
+ public void waitTopologyChange() throws InterruptedException
+ {
+ lock.await();
+ }
+ }
+}
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,67 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <UDP
+ mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
+ mcast_port="${jgroups.udp.mcast_port:45588}"
+ tos="8"
+ ucast_recv_buf_size="20M"
+ ucast_send_buf_size="640K"
+ mcast_recv_buf_size="25M"
+ mcast_send_buf_size="640K"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ ip_ttl="${jgroups.udp.ip_ttl:8}"
+ enable_bundling="true"
+ enable_diagnostics="true"
+ thread_naming_pattern="cl"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="2"
+ thread_pool.max_threads="8"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="true"
+ thread_pool.queue_max_size="10000"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+
+ <PING timeout="2000"
+ num_initial_members="3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD_ALL/>
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK exponential_backoff="300"
+ xmit_stagger_timeout="200"
+ use_mcast_xmit="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER />
+ <!-- pbcast.FLUSH /-->
+</config>
\ No newline at end of file
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,35 @@
+grant codeBase "@MAVEN_REPO at -"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at -"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@TEST_CLASSES at -"{
+ permission java.lang.RuntimePermission "accessRPCService";
+};
+
+grant codeBase "@TEST_CLASSES at -"{
+ permission java.lang.RuntimePermission "manageContainer";
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.component.common/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.commons.test/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.commons/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.container/-"{
+ permission java.security.AllPermission;
+};
+
+
+
+
Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/pom.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -53,6 +53,7 @@
<module>exo.kernel.commons</module>
<module>exo.kernel.commons.test</module>
<module>exo.kernel.component.common</module>
+ <module>exo.kernel.component.ext.rpc.impl.jgroups.v3</module>
<module>exo.kernel.component.cache</module>
<module>exo.kernel.component.ext.cache.impl.jboss.v3</module>
<module>exo.kernel.component.ext.cache.impl.infinispan.v5</module>
@@ -86,7 +87,7 @@
</dependency>
<dependency>
<groupId>org.exoplatform.kernel</groupId>
- <artifactId>exo.kernel.component.remote</artifactId>
+ <artifactId>exo.kernel.component.common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
More information about the exo-jcr-commits
mailing list