[exo-jcr-commits] exo-jcr SVN: r5270 - in kernel/trunk: exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl and 20 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 6 08:40:18 EST 2011


Author: nfilotto
Date: 2011-12-06 08:40:17 -0500 (Tue, 06 Dec 2011)
New Revision: 5270

Added:
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
   kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
Modified:
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
   kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
   kernel/trunk/pom.xml
Log:
EXOJCR-1672: Propose an RPCService implementation based on JGroups 3 (impl)

Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1032 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.commons.utils.PropertyManager;
+import org.exoplatform.commons.utils.SecurityHelper;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RPCService;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.picocontainer.Startable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
+ * {@link MessageDispatcher} of JGroups. This implementation is not designed to give 
+ * the best possible performances, it only aims to give a way to communicate with other nodes.
+ * 
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public abstract class AbstractRPCService implements RPCService, Startable, RequestHandler, MembershipListener
+{
+
+   /**
+    * Connection logger.
+    */
+   private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
+   
+   /**
+    * The name of the parameter for the location of the JGroups configuration.
+    */
+   protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
+
+   /**
+    * The name of the parameter for the name of the cluster.
+    */
+   protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
+
+   /**
+    * The name of the parameter for the default timeout
+    */
+   protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
+
+   /**
+    * The name of the parameter to allow the failover
+    */
+   protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
+
+   /**
+    * The name of the parameter for the retry timeout
+    */
+   protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
+   
+   /**
+    * The value of the default timeout
+    */
+   protected static final int DEFAULT_TIMEOUT = 0;
+   
+   /**
+    * The value of the default retry timeout
+    */
+   protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
+
+   /**
+    * The default value of the cluster name
+    */
+   protected static final String CLUSTER_NAME = "RPCService-Cluster";
+   
+   /**
+    * The configurator used to create the JGroups Channel
+    */
+   protected final ProtocolStackConfigurator configurator;
+
+   /**
+    * The lock used to synchronize all the threads waiting for a topology change.
+    */
+   private final Object topologyChangeLock = new Object();
+   
+   /**
+    * The name of the cluster
+    */
+   private final String clusterName;
+
+   /**
+    * The JGroups Channel used to communicate with other nodes
+    */
+   protected Channel channel;
+
+   /**
+    * The current list of all the members of the cluster
+    */
+   protected volatile List<Address> members;
+
+   /**
+    * The address of the current coordinator
+    */
+   protected volatile Address coordinator;
+
+   /**
+    * Indicates whether the current node is the coordinator of the cluster or not
+    */
+   protected volatile boolean isCoordinator;
+   
+   /**
+    * The default value of the timeout
+    */
+   private long defaultTimeout = DEFAULT_TIMEOUT;
+
+   /**
+    * The value of the retry timeout
+    */
+   private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
+   
+   /**
+    * Indicates whether the failover capabilities are enabled
+    */
+   private boolean allowFailover = true;
+   
+   /**
+    * The dispatcher used to launch the command of the cluster nodes
+    */
+   protected MessageDispatcher dispatcher;
+
+   /**
+    * The signal that indicates that the service is started, it will be used
+    * to make the application wait until the service is fully started to
+    * ensure that all the commands have been registered before handling
+    * incoming messages.
+    */
+   private final CountDownLatch startSignal = new CountDownLatch(1);
+   
+   /**
+    * All the registered {@link TopologyChangeListener}
+    */
+   private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
+
+   /**
+    * Current State of the {@link RPCServiceImpl}
+    */
+   private volatile State state;
+
+   /**
+    * All the commands that have been registered
+    */
+   private volatile Map<String, RemoteCommand> commands =
+      Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
+
+   /**
+    * The public constructor
+    * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
+    * {@link ExoContainer}
+    * @param params the list of initial parameters
+    * @param configManager the configuration manager used to get the configuration
+    * of JGroups
+    */
+   public AbstractRPCService(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+   {
+      if (params == null)
+      {
+         throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
+      }
+      final URL properties = getProperties(params, configManager);
+      if (LOG.isInfoEnabled())
+      {
+         LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
+      }
+
+      try
+      {
+         this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
+         {
+            public ProtocolStackConfigurator run() throws Exception
+            {
+               return ConfiguratorFactory.getStackConfigurator(properties);
+            }
+         });
+      }
+      catch (PrivilegedActionException pae)
+      {
+         throw new RuntimeException("Cannot load the JGroups configuration from " + properties, pae.getCause());
+      }
+
+      this.clusterName = getClusterName(ctx, params);
+      if (LOG.isDebugEnabled())
+      {
+         LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
+      }
+      String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
+      if (sTimeout != null)
+      {
+         defaultTimeout = Integer.parseInt(sTimeout);
+         if (LOG.isDebugEnabled())
+         {
+            LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
+         }
+      }
+      String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
+      if (sAllowFailover != null)
+      {
+         allowFailover = Boolean.valueOf(sAllowFailover);
+         if (LOG.isDebugEnabled())
+         {
+            LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
+         }
+      }
+      sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
+      if (sTimeout != null)
+      {
+         retryTimeout = Integer.parseInt(sTimeout);
+         if (LOG.isDebugEnabled())
+         {
+            LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
+         }
+      }
+      this.state = State.INITIALIZED;
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
+      throws RPCException
+   {
+      return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
+      throws RPCException
+   {
+      return executeCommandOnAllNodesMain(command, true, timeout, args);
+   }
+
+   /**
+    * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
+    * same type but with the default timeout. The command must be registered first otherwise an 
+    * {@link RPCException} will be thrown.
+    *
+    * @param command The command to execute on each cluster node
+    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+    *  and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param timeout a timeout after which to throw a replication exception.
+    * @param args an array of {@link Serializable} objects corresponding to parameters of the command 
+    * to execute remotely
+    * @return a list of responses from all the members of the cluster. If we met an exception on a given node, 
+    * the RPCException will be the corresponding response of this particular node
+    * @throws RPCException in the event of problems.
+    */
+   protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
+      Serializable... args) throws RPCException
+   {
+      return excecuteCommand(members, command, synchronous, timeout, args);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
+      throws RPCException
+   {
+      return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
+      throws RPCException
+   {
+      return executeCommandOnCoordinatorMain(command, true, timeout, args);
+   }
+
+   /**
+    * Executes a command on the coordinator only. This method is equivalent to the other method of the
+    * same type but with the default timeout. The command must be registered first otherwise an 
+    * {@link RPCException} will be thrown.
+    *
+    * @param command The command to execute on the coordinator node
+    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, 
+    * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param timeout a timeout after which to throw a replication exception.
+    * @param args an array of {@link Serializable} objects corresponding to parameters of the command 
+    * to execute remotely
+    * @return the response of the coordinator.
+    * @throws RPCException in the event of problems.
+    */
+   protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
+      Serializable... args) throws RPCException
+   {
+      Address coordinator = this.coordinator;
+      Vector<Address> v = new Vector<Address>(1);
+      v.add(coordinator);
+      List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
+      Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
+      if (allowFailover && result instanceof MemberHasLeftException)
+      {
+         // The failover capabilities have been enabled and the coordinator seems to have left
+         if (coordinator.equals(this.coordinator))
+         {
+            synchronized(topologyChangeLock)
+            {
+               if (coordinator.equals(this.coordinator))
+               {
+                  if (LOG.isTraceEnabled())
+                     LOG.trace("The coordinator did not change yet, we will relaunch the command after " 
+                              + retryTimeout + " ms or once a topology change has been detected");                  
+                  try
+                  {
+                     topologyChangeLock.wait(retryTimeout);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     Thread.currentThread().interrupt();
+                  }                  
+               }
+            }
+         }
+         if (LOG.isTraceEnabled())
+            LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");                  
+         return executeCommandOnCoordinator(command, synchronous, timeout, args);
+      }
+      else if (result instanceof RPCException)
+      {
+         throw (RPCException)result;
+      }
+      return result;
+   }
+
+   /**
+    * Execute the command on all the nodes corresponding to the list of destinations.
+    * @param dests the list of members on which the command needs to be executed
+    * @param command the command to execute
+    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets 
+    * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param timeout a timeout after which to throw a replication exception.
+    * @param args the list of parameters
+    * @return a list of responses from all the targeted members of the cluster.
+    * @throws RPCException in the event of problems.
+    */
+   protected List<Object> excecuteCommand(final List<Address> dests, RemoteCommand command,
+      final boolean synchronous, final long timeout, Serializable... args) throws RPCException
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+      if (state != State.STARTED)
+      {
+         throw new RPCException(
+            "Cannot execute any commands if the service is not started, the current state of the service is " + state);
+      }
+      final String commandId = command.getId();
+      if (commands.get(commandId) != command)
+      {
+         throw new RPCException("Command " + commandId + " unknown, please register your command first");
+      }
+      final Message msg = new Message();
+      setObject(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
+      RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
+      {
+         public RspList run()
+         {
+            try
+            {
+               return castMessage(dests, msg, synchronous, timeout);
+            }
+            catch (Exception e)
+            {
+               LOG.error("Could not cast the message corresponding to the command " + commandId + ".", e);
+            }
+            return null;
+         }
+      });
+
+      if (LOG.isTraceEnabled())
+         LOG.trace("responses: " + rsps);
+      if (rsps == null)
+         throw new RPCException("Could not get the responses for command " + commandId + ".");
+      if (!synchronous)
+         return Collections.emptyList();// async case
+      if (LOG.isTraceEnabled())
+      {
+         LOG.trace("(" + getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
+      }
+      List<Object> retval = new ArrayList<Object>(rsps.size());
+      for (Address dest : dests)
+      {
+         Rsp rsp = rsps.get(dest);
+         if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
+         {
+            // The corresponding member has left
+            retval.add(new MemberHasLeftException("No response for the member " + dest
+               + ", this member has probably left the cluster."));
+         }
+         else if (!rsp.wasReceived())
+         {
+            retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
+         }
+         else
+         {
+            Object value = rsp.getValue();
+            if (value instanceof RPCException)
+            {
+               // if we have any application-level exceptions make sure we throw them!!
+               if (LOG.isTraceEnabled())
+                  LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
+            }
+            retval.add(value);
+         }
+      }
+      return retval;
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public Object handle(Message msg)
+   {
+      String commandId = null;
+      try
+      {
+         // Ensure that the service is fully started before trying to execute any command
+         startSignal.await();
+         MessageBody body = (MessageBody)msg.getObject();
+         commandId = body.getCommandId();
+         if (!body.accept(getLocalAddress()))
+         {
+            if (LOG.isTraceEnabled())
+               LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
+                     "only and the local node is not the coordinator, the command will be ignored");
+            return null;
+         }
+         RemoteCommand command = getCommand(commandId);
+         if (command == null)
+         {
+            return new RPCException("Command " + commandId + " unkown, please register your command first");
+         }
+         Object execResult = command.execute(body.getArgs());
+         if (LOG.isTraceEnabled())
+            LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
+         return execResult;
+      }
+      catch (Throwable x)
+      {
+         if (LOG.isTraceEnabled())
+            LOG.trace("Problems invoking command.", x);
+         return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void block()
+   {
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void suspect(Address suspectedMbr)
+   {
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void viewAccepted(View view)
+   {
+      boolean coordinatorHasChanged;
+      synchronized (topologyChangeLock)
+      {
+         this.members = getMembers(view);
+         Address currentCoordinator = coordinator;
+         this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
+         this.isCoordinator = coordinator != null && coordinator.equals(getLocalAddress());
+         coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
+         // Release all the nodes
+         topologyChangeLock.notifyAll();
+      }
+      onTopologyChange(coordinatorHasChanged);
+   }
+
+   /**
+    * Called anytime the topology has changed, this method will notify all the listeners
+    * currently registered
+    * @param coordinatorHasChanged this parameter is set to <code>true</code> if the 
+    * coordinator has changed, <code>false</code> otherwise
+    */
+   private void onTopologyChange(boolean coordinatorHasChanged)
+   {
+      TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
+      for (TopologyChangeListener listener : listeners)
+      {
+         try
+         {
+            listener.onChange(event);
+         }
+         catch (Exception e)
+         {
+            LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
+         }
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public synchronized RemoteCommand registerCommand(RemoteCommand command)
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+      if (command != null)
+      {
+         String commandId = command.getId();
+         if (commandId == null)
+         {
+            throw new IllegalArgumentException("The command Id cannot be null");
+         }
+         Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+         RemoteCommand oldCommand = tmpCommands.put(commandId, command);
+         if (oldCommand != null && PropertyManager.isDevelopping())
+         {
+            LOG.warn("A command has already been registered with the id " + commandId
+               + ", this command will be replaced with the new one");
+         }
+         this.commands = Collections.unmodifiableMap(tmpCommands);
+         return command;
+      }
+      return null;
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public synchronized void unregisterCommand(RemoteCommand command)
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+      if (command != null)
+      {
+         String commandId = command.getId();
+         if (commandId == null)
+         {
+            throw new IllegalArgumentException("The command Id cannot be null");
+         }
+         if (commands.get(commandId) != command)
+         {
+            // We prevent to remove any command that has not been registered, thus we expect that
+            // the registered instance is exactly the same instance as the one that we want to
+            // unregister
+            if (PropertyManager.isDevelopping())
+            {
+               LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
+                  + " is unknown or the instance of RemoteCommand to unregister is unknown");
+            }
+            return;
+         }
+         Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+         tmpCommands.remove(commandId);
+         this.commands = Collections.unmodifiableMap(tmpCommands);
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public boolean isCoordinator() throws RPCException
+   {
+      if (state != State.STARTED)
+      {
+         throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
+                  "the service is not started, the current state of the service is " + state);
+      }
+      return isCoordinator;
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+      if (listener == null)
+      {
+         return;
+      }
+      listeners.add(listener);   
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+      if (listener == null)
+      {
+         return;
+      }
+      listeners.remove(listener);
+   }
+
+   /**
+    * Gives the {@link RemoteCommand} corresponding to the given id
+    * @param commandId the command id of the command to retrieve
+    * @return the corresponding {@link RemoteCommand}
+    */
+   protected RemoteCommand getCommand(String commandId)
+   {
+      return commands.get(commandId);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void start()
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+
+      try
+      {
+         SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
+         {
+            public Void run() throws Exception
+            {
+               channel = createChannel();
+               dispatcher = new MessageDispatcher(channel, null, AbstractRPCService.this, AbstractRPCService.this);
+               channel.connect(clusterName);
+               return null;
+            }
+         });
+      }
+      catch (PrivilegedActionException pae)
+      {
+         throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", pae.getCause());
+      }
+      finally
+      {
+         this.state = State.STARTED;
+         startSignal.countDown();
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void stop()
+   {
+      SecurityManager security = System.getSecurityManager();
+      if (security != null)
+      {
+         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+      }
+
+      this.state = State.STOPPED;
+      this.isCoordinator = false;
+      if (channel != null && channel.isOpen())
+      {
+         if (LOG.isInfoEnabled())
+            LOG.info("Disconnecting and closing the Channel");
+         SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+         {
+            public Void run()
+            {
+               channel.disconnect();
+               channel.close();
+               return null;
+            }
+         });
+         channel = null;
+      }
+      if (dispatcher != null)
+      {
+         dispatcher.stop();
+         dispatcher = null;
+      }
+   }
+
+   /**
+    * Gives the value of the default timeout
+    * @return the default timeout
+    */
+   protected long getDefaultTimeout()
+   {
+      return defaultTimeout;
+   }
+
+   /**
+    * Gives the name of the cluster
+    * @return the name of the cluster
+    */
+   protected String getClusterName()
+   {
+      return clusterName;
+   }
+   
+   /**
+    * Gives the value of the retry timeout
+    * @return the value of the retry timeout
+    */
+   protected long getRetryTimeout()
+   {
+      return retryTimeout;
+   }
+
+   /**
+    * Indicates whether the failover capabilities are enabled or not
+    * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
+    * otherwise
+    */
+   protected boolean isAllowFailover()
+   {
+      return allowFailover;
+   }
+
+   /**
+    * Returns the channel's own address. The result of calling this method on an unconnected
+    * channel is implementation defined (may return null). Calling this method on a closed
+    * channel returns null. Successor to {@link #getAddress()}. Addresses can be used as destination
+    * in the <code>send()</code> operation.
+    * @return The channel's address (opaque) or null if it cannot be found
+    */
+   protected abstract Address getLocalAddress();
+   
+   /**
+    * Cast a message to all the given members
+    * @param dests The members to which the message is to be sent.
+    * @param msg The message to be sent to the members.
+    * @param synchronous Indicates whether the message must be sent in synchronous or asynchronous mode.
+    * @param timeout If 0: wait forever. Otherwise, wait for responses or timeout time.
+    * @return A list of responses. Each response is an <code>Object</code> and associated to its sender.
+    * @throws Exception if any error occur while casting the message
+    */
+   protected abstract RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception;
+   
+   /**
+    * Create a channel
+    * @return An initialized channel
+    * @throws Exception if any error occur while creating the channel
+    */
+   protected abstract Channel createChannel() throws Exception;
+   
+   /**
+    * Returns a reference to the List of members (ordered)
+    * Do NOT change this list, hence your will invalidate the view
+    * Make a copy if you have to modify it.
+    *
+    * @return a reference to the ordered list of members in this view
+    */
+   protected abstract List<Address> getMembers(View view);
+   
+   /**
+    * Takes an object and uses Java serialization to generate the byte[] buffer which
+    * is set in the message.
+    */
+   protected abstract void setObject(Message m, Object o);
+   
+   /**
+    * Gives the value of the {@link ValueParam} corresponding to the given key
+    * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
+    * @param parameterKey the name of the {@link ValueParam} that we are looking for
+    * @return the value if it exists, null otherwise
+    */
+   private static String getValueParam(InitParams params, String parameterKey)
+   {
+      try
+      {
+         return params.getValueParam(parameterKey).getValue().trim();
+      }
+      catch (NullPointerException e)
+      {
+         return null;
+      }
+   }
+
+   /**
+    * Gives the {@link URL} corresponding to the location of the JGroups configuration
+    * @param params the initial parameters from which we extract the parameter 
+    * <code>PARAM_JGROUPS_CONFIG</code> 
+    * @param configManager the configuration manager used to get the {@link URL} corresponding
+    * to the path given in the configuration of the RPCServiceImpl
+    * @return The {@link URL} corresponding to the location of the JGroups configuration,
+    * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
+    */
+   private static URL getProperties(InitParams params, ConfigurationManager configManager)
+   {
+      String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
+      if (configPath == null)
+      {
+         throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
+            + "' of RPCServiceImpl is mandatory");
+      }
+      URL properties;
+      try
+      {
+         properties = configManager.getResource(configPath);
+      }
+      catch (Exception e)
+      {
+         throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
+      }
+      if (properties == null)
+      {
+         throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
+      }
+      return properties;
+   }
+
+   /**
+    * Gives the name of the cluster that will be able to support several portal containers
+    * since the name will be post fixed with "-${container-name}"
+    * @param ctx the context from which we extract the name of the container
+    * @param params the list of initial parameters from which we get the value of the parameter
+    * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
+    */
+   private static String getClusterName(ExoContainerContext ctx, InitParams params)
+   {
+      String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
+      if (clusterName == null)
+      {
+         clusterName = CLUSTER_NAME;
+      }
+      return clusterName += "-" + ctx.getName();
+   }
+
+   /**
+    * This intern class will be used to 
+    */
+   public static class MessageBody implements Externalizable
+   {
+      /**
+       * The Id of the command to execute
+       */
+      private String commandId;
+
+      /**
+       * The list of parameters
+       */
+      private Serializable[] args;
+      
+      /**
+       * The hash code of the expected destination
+       */
+      private int destination;
+
+      public MessageBody()
+      {
+      }
+
+      /**
+       * @param dest The destination of the message
+       * @param commandId the id of the command to execute
+       * @param args the arguments to use
+       */
+      public MessageBody(Address dest, String commandId, Serializable[] args)
+      {
+         this.commandId = commandId;
+         this.args = args;
+         this.destination = dest == null ? 0 : dest.hashCode();
+      }
+
+      public String getCommandId()
+      {
+         return commandId;
+      }
+
+      public Serializable[] getArgs()
+      {
+         return args;
+      }      
+
+      /**
+       * Indicates whether or not the given message body accepts the given address
+       * @param address the address to check
+       * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
+       * <code>false</code> otherwise
+       */
+      public boolean accept(Address address)
+      {
+         return destination == 0 || destination == address.hashCode();
+      }
+
+      /**
+       * {@inheritDoc}
+       */
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+      {
+         boolean unicast = in.readBoolean();
+         if (unicast)
+         {
+            this.destination = in.readInt();            
+         }
+         this.commandId = in.readUTF();
+         int size = in.readInt();
+         if (size == -1)
+         {
+            this.args = null;
+         }
+         else
+         {
+            this.args = new Serializable[size];
+            for (int i = 0; i < size; i++)
+            {
+               args[i] = (Serializable)in.readObject();
+            }
+         }
+      }
+
+      /**
+       * {@inheritDoc}
+       */
+      public void writeExternal(ObjectOutput out) throws IOException
+      {
+         boolean unicast = destination != 0;
+         out.writeBoolean(unicast);
+         if (unicast)
+         {
+            out.writeInt(destination);            
+         }         
+         out.writeUTF(commandId);
+         if (args == null)
+         {
+            out.writeInt(-1);
+         }
+         else
+         {
+            out.writeInt(args.length);
+            for (int i = 0; i < args.length; i++)
+            {
+               out.writeObject(args[i]);
+            }
+         }
+      }
+   }
+
+   /**
+    * All the potential states of the {@link RPCServiceImpl}
+    */
+   public enum State
+   {
+      INITIALIZED, STARTED, STOPPED
+   }
+
+   public static class MemberHasLeftException extends RPCException
+   {
+
+      /**
+       * The serial version UID
+       */
+      private static final long serialVersionUID = 3558158913564367637L;
+
+      public MemberHasLeftException(String message)
+      {
+         super(message);
+      }
+   }
+}

Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java	2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java	2011-12-06 13:40:17 UTC (rev 5270)
@@ -18,1036 +18,78 @@
  */
 package org.exoplatform.services.rpc.impl;
 
-import org.exoplatform.commons.utils.PropertyManager;
-import org.exoplatform.commons.utils.SecurityHelper;
-import org.exoplatform.container.ExoContainer;
 import org.exoplatform.container.ExoContainerContext;
 import org.exoplatform.container.configuration.ConfigurationManager;
 import org.exoplatform.container.xml.InitParams;
-import org.exoplatform.container.xml.ValueParam;
-import org.exoplatform.services.log.ExoLogger;
-import org.exoplatform.services.log.Log;
-import org.exoplatform.services.rpc.RPCException;
-import org.exoplatform.services.rpc.RPCService;
-import org.exoplatform.services.rpc.RemoteCommand;
-import org.exoplatform.services.rpc.TopologyChangeEvent;
-import org.exoplatform.services.rpc.TopologyChangeListener;
 import org.jgroups.Address;
 import org.jgroups.Channel;
-import org.jgroups.ChannelException;
 import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
 import org.jgroups.Message;
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.MessageDispatcher;
-import org.jgroups.blocks.RequestHandler;
-import org.jgroups.conf.ConfiguratorFactory;
-import org.jgroups.conf.ProtocolStackConfigurator;
-import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
-import org.picocontainer.Startable;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Vector;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 
 /**
- * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
- * {@link MessageDispatcher}. This implementation is not designed to give the best possible
- * performances, it only aims to give a way to communicate with other nodes.
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 2.
  * 
  * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
  * @version $Id$
  */
-public class RPCServiceImpl implements RPCService, Startable, RequestHandler, MembershipListener
+public class RPCServiceImpl extends AbstractRPCService
 {
 
    /**
-    * Connection logger.
+    * {@inheritDoc}
     */
-   private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
-
-   /**
-    * We use reflection for the Message.setObject method in order to remain backward compatible 
-    * because since JGroups 2.12 the signature has changed the expected parameter is no more a Serializable, 
-    * it is an Object
-    */
-   private static Method MESSAGE_SET_OBJECT_METHOD;
-   
-   static
-   {
-      try
-      {
-         MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Serializable.class);
-      }
-      catch (SecurityException e)
-      {
-         throw e;
-      }
-      catch (NoSuchMethodException e)
-      {
-         // We assume that we use JGroups 2.12 or higher
-         try
-         {
-            MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Object.class);
-         }
-         catch (SecurityException e1)
-         {
-            throw e1;
-         }
-         catch (Exception e1)
-         {
-            throw new RuntimeException("Could not find the right Message.setObject method", e);
-         }
-      }
-   }
-   
-   /**
-    * The name of the parameter for the location of the JGroups configuration.
-    */
-   protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
-
-   /**
-    * The name of the parameter for the name of the cluster.
-    */
-   protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
-
-   /**
-    * The name of the parameter for the default timeout
-    */
-   protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
-
-   /**
-    * The name of the parameter to allow the failover
-    */
-   protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
-
-   /**
-    * The name of the parameter for the retry timeout
-    */
-   protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
-   
-   /**
-    * The value of the default timeout
-    */
-   protected static final int DEFAULT_TIMEOUT = 0;
-   
-   /**
-    * The value of the default retry timeout
-    */
-   protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
-
-   /**
-    * The default value of the cluster name
-    */
-   protected static final String CLUSTER_NAME = "RPCService-Cluster";
-   
-   /**
-    * The configurator used to create the JGroups Channel
-    */
-   private final ProtocolStackConfigurator configurator;
-
-   /**
-    * The lock used to synchronize all the threads waiting for a topology change.
-    */
-   private final Object topologyChangeLock = new Object();
-   
-   /**
-    * The name of the cluster
-    */
-   private final String clusterName;
-
-   /**
-    * The JGroups Channel used to communicate with other nodes
-    */
-   protected Channel channel;
-
-   /**
-    * The current list of all the members of the cluster
-    */
-   protected volatile Vector<Address> members;
-
-   /**
-    * The address of the current coordinator
-    */
-   protected volatile Address coordinator;
-
-   /**
-    * Indicates whether the current node is the coordinator of the cluster or not
-    */
-   protected volatile boolean isCoordinator;
-   
-   /**
-    * The default value of the timeout
-    */
-   private long defaultTimeout = DEFAULT_TIMEOUT;
-
-   /**
-    * The value of the retry timeout
-    */
-   private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
-   
-   /**
-    * Indicates whether the failover capabilities are enabled
-    */
-   private boolean allowFailover = true;
-   
-   /**
-    * The dispatcher used to launch the command of the cluster nodes
-    */
-   private MessageDispatcher dispatcher;
-
-   /**
-    * The signal that indicates that the service is started, it will be used
-    * to make the application wait until the service is fully started to
-    * ensure that all the commands have been registered before handling
-    * incoming messages.
-    */
-   private final CountDownLatch startSignal = new CountDownLatch(1);
-   
-   /**
-    * All the registered {@link TopologyChangeListener}
-    */
-   private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
-
-   /**
-    * Current State of the {@link RPCServiceImpl}
-    */
-   private volatile State state;
-
-   /**
-    * All the commands that have been registered
-    */
-   private volatile Map<String, RemoteCommand> commands =
-      Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
-
-   /**
-    * The public constructor
-    * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
-    * {@link ExoContainer}
-    * @param params the list of initial parameters
-    * @param configManager the configuration manager used to get the configuration
-    * of JGroups
-    */
    public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
    {
-      if (params == null)
-      {
-         throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
-      }
-      final URL properties = getProperties(params, configManager);
-      if (LOG.isInfoEnabled())
-      {
-         LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
-      }
-
-      try
-      {
-         this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
-         {
-            public ProtocolStackConfigurator run() throws Exception
-            {
-               return ConfiguratorFactory.getStackConfigurator(properties);
-            }
-         });
-      }
-      catch (PrivilegedActionException pae)
-      {
-         Throwable cause = pae.getCause();
-         if (cause instanceof ChannelException)
-         {
-            throw new RuntimeException("Cannot load the JGroups configuration from " + properties, cause);
-         }
-         else if (cause instanceof RuntimeException)
-         {
-            throw (RuntimeException)cause;
-         }
-         else
-         {
-            throw new RuntimeException(cause);
-         }
-      }
-
-      this.clusterName = getClusterName(ctx, params);
-      if (LOG.isDebugEnabled())
-      {
-         LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
-      }
-      String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
-      if (sTimeout != null)
-      {
-         defaultTimeout = Integer.parseInt(sTimeout);
-         if (LOG.isDebugEnabled())
-         {
-            LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
-         }
-      }
-      String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
-      if (sAllowFailover != null)
-      {
-         allowFailover = Boolean.valueOf(sAllowFailover);
-         if (LOG.isDebugEnabled())
-         {
-            LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
-         }
-      }
-      sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
-      if (sTimeout != null)
-      {
-         retryTimeout = Integer.parseInt(sTimeout);
-         if (LOG.isDebugEnabled())
-         {
-            LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
-         }
-      }
-      this.state = State.INITIALIZED;
+      super(ctx, params, configManager);
    }
 
    /**
     * {@inheritDoc}
     */
-   public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
-      throws RPCException
+   protected Address getLocalAddress()
    {
-      return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+      return channel.getLocalAddress();
    }
-
+   
    /**
     * {@inheritDoc}
     */
-   public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
-      throws RPCException
+   protected RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout)
    {
-      return executeCommandOnAllNodesMain(command, true, timeout, args);
+      return dispatcher.castMessage(dests instanceof Vector ? (Vector<Address>)dests : new Vector<Address>(dests), msg,
+         synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, timeout);
    }
-
+   
    /**
-    * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
-    * same type but with the default timeout. The command must be registered first otherwise an 
-    * {@link RPCException} will be thrown.
-    *
-    * @param command The command to execute on each cluster node
-    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
-    *  and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout a timeout after which to throw a replication exception.
-    * @param args an array of {@link Serializable} objects corresponding to parameters of the command 
-    * to execute remotely
-    * @return a list of responses from all the members of the cluster. If we met an exception on a given node, 
-    * the RPCException will be the corresponding response of this particular node
-    * @throws RPCException in the event of problems.
-    */
-   protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
-      Serializable... args) throws RPCException
-   {
-      return excecuteCommand(members, command, synchronous, timeout, args);
-   }
-
-   /**
     * {@inheritDoc}
     */
-   public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
-      throws RPCException
+   protected Channel createChannel() throws Exception
    {
-      return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+      Channel channel = new JChannel(configurator);
+      channel.setOpt(Channel.AUTO_RECONNECT, true);
+      return channel;
    }
 
    /**
     * {@inheritDoc}
     */
-   public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
-      throws RPCException
+   protected List<Address> getMembers(View view)
    {
-      return executeCommandOnCoordinatorMain(command, true, timeout, args);
+      return view.getMembers();
    }
 
    /**
-    * Executes a command on the coordinator only. This method is equivalent to the other method of the
-    * same type but with the default timeout. The command must be registered first otherwise an 
-    * {@link RPCException} will be thrown.
-    *
-    * @param command The command to execute on the coordinator node
-    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, 
-    * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout a timeout after which to throw a replication exception.
-    * @param args an array of {@link Serializable} objects corresponding to parameters of the command 
-    * to execute remotely
-    * @return the response of the coordinator.
-    * @throws RPCException in the event of problems.
-    */
-   protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
-      Serializable... args) throws RPCException
-   {
-      Address coordinator = this.coordinator;
-      Vector<Address> v = new Vector<Address>(1);
-      v.add(coordinator);
-      List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
-      Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
-      if (allowFailover && result instanceof MemberHasLeftException)
-      {
-         // The failover capabilities have been enabled and the coordinator seems to have left
-         if (coordinator.equals(this.coordinator))
-         {
-            synchronized(topologyChangeLock)
-            {
-               if (coordinator.equals(this.coordinator))
-               {
-                  if (LOG.isTraceEnabled())
-                     LOG.trace("The coordinator did not change yet, we will relaunch the command after " 
-                              + retryTimeout + " ms or once a topology change has been detected");                  
-                  try
-                  {
-                     topologyChangeLock.wait(retryTimeout);
-                  }
-                  catch (InterruptedException e)
-                  {
-                     Thread.currentThread().interrupt();
-                  }                  
-               }
-            }
-         }
-         if (LOG.isTraceEnabled())
-            LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");                  
-         return executeCommandOnCoordinator(command, synchronous, timeout, args);
-      }
-      else if (result instanceof RPCException)
-      {
-         throw (RPCException)result;
-      }
-      return result;
-   }
-
-   /**
-    * Execute the command on all the nodes corresponding to the list of destinations.
-    * @param dests the list of members on which the command needs to be executed
-    * @param command the command to execute
-    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets 
-    * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout a timeout after which to throw a replication exception.
-    * @param args the list of parameters
-    * @return a list of responses from all the targeted members of the cluster.
-    * @throws RPCException in the event of problems.
-    */
-   protected List<Object> excecuteCommand(final Vector<Address> dests, RemoteCommand command,
-      final boolean synchronous, final long timeout, Serializable... args) throws RPCException
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-      if (state != State.STARTED)
-      {
-         throw new RPCException(
-            "Cannot execute any commands if the service is not started, the current state of the service is " + state);
-      }
-      String commandId = command.getId();
-      if (commands.get(commandId) != command)
-      {
-         throw new RPCException("Command " + commandId + " unknown, please register your command first");
-      }
-      final Message msg = new Message();
-      try
-      {
-         MESSAGE_SET_OBJECT_METHOD.invoke(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
-      }
-      catch (Exception e)
-      {
-         throw new RPCException("Could not call the method Message.setObject");
-      }
-      RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
-      {
-         public RspList run()
-         {
-            return dispatcher.castMessage(dests, msg, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE,
-               timeout);
-         }
-      });
-
-      if (LOG.isTraceEnabled())
-         LOG.trace("responses: " + rsps);
-      if (rsps == null)
-         throw new RPCException("Could not get the responses for command " + commandId + ".");
-      if (!synchronous)
-         return Collections.emptyList();// async case
-      if (LOG.isTraceEnabled())
-      {
-         LOG.trace("(" + channel.getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
-      }
-      List<Object> retval = new ArrayList<Object>(rsps.size());
-      for (Address dest : dests)
-      {
-         Rsp rsp = rsps.get(dest);
-         if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
-         {
-            // The corresponding member has left
-            retval.add(new MemberHasLeftException("No response for the member " + dest
-               + ", this member has probably left the cluster."));
-         }
-         else if (!rsp.wasReceived())
-         {
-            retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
-         }
-         else
-         {
-            Object value = rsp.getValue();
-            if (value instanceof RPCException)
-            {
-               // if we have any application-level exceptions make sure we throw them!!
-               if (LOG.isTraceEnabled())
-                  LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
-            }
-            retval.add(value);
-         }
-      }
-      return retval;
-   }
-
-   /**
     * {@inheritDoc}
     */
-   public Object handle(Message msg)
+   protected void setObject(Message m, Object o)
    {
-      String commandId = null;
-      try
-      {
-         // Ensure that the service is fully started before trying to execute any command
-         startSignal.await();
-         MessageBody body = (MessageBody)msg.getObject();
-         commandId = body.getCommandId();
-         if (!body.accept(channel.getLocalAddress()))
-         {
-            if (LOG.isTraceEnabled())
-               LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
-                     "only and the local node is not the coordinator, the command will be ignored");
-            return null;
-         }
-         RemoteCommand command = getCommand(commandId);
-         if (command == null)
-         {
-            return new RPCException("Command " + commandId + " unkown, please register your command first");
-         }
-         Object execResult = command.execute(body.getArgs());
-         if (LOG.isTraceEnabled())
-            LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
-         return execResult;
-      }
-      catch (Throwable x)
-      {
-         if (LOG.isTraceEnabled())
-            LOG.trace("Problems invoking command.", x);
-         return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
-      }
+      m.setObject((Serializable)o);
    }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void block()
-   {
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void suspect(Address suspectedMbr)
-   {
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void viewAccepted(View view)
-   {
-      boolean coordinatorHasChanged;
-      synchronized (topologyChangeLock)
-      {
-         this.members = view.getMembers();
-         Address currentCoordinator = coordinator;
-         this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
-         this.isCoordinator = coordinator != null && coordinator.equals(channel.getLocalAddress());
-         coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
-         // Release all the nodes
-         topologyChangeLock.notifyAll();
-      }
-      onTopologyChange(coordinatorHasChanged);
-   }
-
-   /**
-    * Called anytime the topology has changed, this method will notify all the listeners
-    * currently registered
-    * @param coordinatorHasChanged this parameter is set to <code>true</code> if the 
-    * coordinator has changed, <code>false</code> otherwise
-    */
-   private void onTopologyChange(boolean coordinatorHasChanged)
-   {
-      TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
-      for (TopologyChangeListener listener : listeners)
-      {
-         try
-         {
-            listener.onChange(event);
-         }
-         catch (Exception e)
-         {
-            LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
-         }
-      }
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public synchronized RemoteCommand registerCommand(RemoteCommand command)
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-      if (command != null)
-      {
-         String commandId = command.getId();
-         if (commandId == null)
-         {
-            throw new IllegalArgumentException("The command Id cannot be null");
-         }
-         Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
-         RemoteCommand oldCommand = tmpCommands.put(commandId, command);
-         if (oldCommand != null && PropertyManager.isDevelopping())
-         {
-            LOG.warn("A command has already been registered with the id " + commandId
-               + ", this command will be replaced with the new one");
-         }
-         this.commands = Collections.unmodifiableMap(tmpCommands);
-         return command;
-      }
-      return null;
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public synchronized void unregisterCommand(RemoteCommand command)
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-      if (command != null)
-      {
-         String commandId = command.getId();
-         if (commandId == null)
-         {
-            throw new IllegalArgumentException("The command Id cannot be null");
-         }
-         if (commands.get(commandId) != command)
-         {
-            // We prevent to remove any command that has not been registered, thus we expect that
-            // the registered instance is exactly the same instance as the one that we want to
-            // unregister
-            if (PropertyManager.isDevelopping())
-            {
-               LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
-                  + " is unknown or the instance of RemoteCommand to unregister is unknown");
-            }
-            return;
-         }
-         Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
-         tmpCommands.remove(commandId);
-         this.commands = Collections.unmodifiableMap(tmpCommands);
-      }
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public boolean isCoordinator() throws RPCException
-   {
-      if (state != State.STARTED)
-      {
-         throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
-                  "the service is not started, the current state of the service is " + state);
-      }
-      return isCoordinator;
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-      if (listener == null)
-      {
-         return;
-      }
-      listeners.add(listener);   
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-      if (listener == null)
-      {
-         return;
-      }
-      listeners.remove(listener);
-   }
-
-   /**
-    * Gives the {@link RemoteCommand} corresponding to the given id
-    * @param commandId the command id of the command to retrieve
-    * @return the corresponding {@link RemoteCommand}
-    */
-   protected RemoteCommand getCommand(String commandId)
-   {
-      return commands.get(commandId);
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void start()
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-
-      try
-      {
-         SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
-         {
-            public Void run() throws Exception
-            {
-               channel = new JChannel(configurator);
-               channel.setOpt(Channel.AUTO_RECONNECT, true);
-               dispatcher = new MessageDispatcher(channel, null, RPCServiceImpl.this, RPCServiceImpl.this);
-               channel.connect(clusterName);
-               return null;
-            }
-         });
-      }
-      catch (PrivilegedActionException pae)
-      {
-         Throwable cause = pae.getCause();
-         if (cause instanceof ChannelException)
-         {
-            throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", cause);
-         }
-         else if (cause instanceof RuntimeException)
-         {
-            throw (RuntimeException)cause;
-         }
-         else
-         {
-            throw new RuntimeException(cause);
-         }
-      }
-      finally
-      {
-         this.state = State.STARTED;
-         startSignal.countDown();
-      }
-   }
-
-   /**
-    * {@inheritDoc}
-    */
-   public void stop()
-   {
-      SecurityManager security = System.getSecurityManager();
-      if (security != null)
-      {
-         security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
-      }
-
-      this.state = State.STOPPED;
-      this.isCoordinator = false;
-      if (channel != null && channel.isOpen())
-      {
-         if (LOG.isInfoEnabled())
-            LOG.info("Disconnecting and closing the Channel");
-         SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
-         {
-            public Void run()
-            {
-               channel.disconnect();
-               channel.close();
-               return null;
-            }
-         });
-         channel = null;
-      }
-      if (dispatcher != null)
-      {
-         dispatcher.stop();
-         dispatcher = null;
-      }
-   }
-
-   /**
-    * Gives the value of the default timeout
-    * @return the default timeout
-    */
-   protected long getDefaultTimeout()
-   {
-      return defaultTimeout;
-   }
-
-   /**
-    * Gives the name of the cluster
-    * @return the name of the cluster
-    */
-   protected String getClusterName()
-   {
-      return clusterName;
-   }
-   
-   /**
-    * Gives the value of the retry timeout
-    * @return the value of the retry timeout
-    */
-   protected long getRetryTimeout()
-   {
-      return retryTimeout;
-   }
-
-   /**
-    * Indicates whether the failover capabilities are enabled or not
-    * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
-    * otherwise
-    */
-   protected boolean isAllowFailover()
-   {
-      return allowFailover;
-   }
-
-   /**
-    * Gives the value of the {@link ValueParam} corresponding to the given key
-    * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
-    * @param parameterKey the name of the {@link ValueParam} that we are looking for
-    * @return the value if it exists, null otherwise
-    */
-   private static String getValueParam(InitParams params, String parameterKey)
-   {
-      try
-      {
-         return params.getValueParam(parameterKey).getValue().trim();
-      }
-      catch (NullPointerException e)
-      {
-         return null;
-      }
-   }
-
-   /**
-    * Gives the {@link URL} corresponding to the location of the JGroups configuration
-    * @param params the initial parameters from which we extract the parameter 
-    * <code>PARAM_JGROUPS_CONFIG</code> 
-    * @param configManager the configuration manager used to get the {@link URL} corresponding
-    * to the path given in the configuration of the RPCServiceImpl
-    * @return The {@link URL} corresponding to the location of the JGroups configuration,
-    * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
-    */
-   private static URL getProperties(InitParams params, ConfigurationManager configManager)
-   {
-      String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
-      if (configPath == null)
-      {
-         throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
-            + "' of RPCServiceImpl is mandatory");
-      }
-      URL properties;
-      try
-      {
-         properties = configManager.getResource(configPath);
-      }
-      catch (Exception e)
-      {
-         throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
-      }
-      if (properties == null)
-      {
-         throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
-      }
-      return properties;
-   }
-
-   /**
-    * Gives the name of the cluster that will be able to support several portal containers
-    * since the name will be post fixed with "-${container-name}"
-    * @param ctx the context from which we extract the name of the container
-    * @param params the list of initial parameters from which we get the value of the parameter
-    * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
-    */
-   private static String getClusterName(ExoContainerContext ctx, InitParams params)
-   {
-      String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
-      if (clusterName == null)
-      {
-         clusterName = CLUSTER_NAME;
-      }
-      return clusterName += "-" + ctx.getName();
-   }
-
-   /**
-    * This intern class will be used to 
-    */
-   public static class MessageBody implements Externalizable
-   {
-      /**
-       * The Id of the command to execute
-       */
-      private String commandId;
-
-      /**
-       * The list of parameters
-       */
-      private Serializable[] args;
-      
-      /**
-       * The hash code of the expected destination
-       */
-      private int destination;
-
-      public MessageBody()
-      {
-      }
-
-      /**
-       * @param dest The destination of the message
-       * @param commandId the id of the command to execute
-       * @param args the arguments to use
-       */
-      public MessageBody(Address dest, String commandId, Serializable[] args)
-      {
-         this.commandId = commandId;
-         this.args = args;
-         this.destination = dest == null ? 0 : dest.hashCode();
-      }
-
-      public String getCommandId()
-      {
-         return commandId;
-      }
-
-      public Serializable[] getArgs()
-      {
-         return args;
-      }      
-
-      /**
-       * Indicates whether or not the given message body accepts the given address
-       * @param address the address to check
-       * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
-       * <code>false</code> otherwise
-       */
-      public boolean accept(Address address)
-      {
-         return destination == 0 || destination == address.hashCode();
-      }
-
-      /**
-       * {@inheritDoc}
-       */
-      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
-      {
-         boolean unicast = in.readBoolean();
-         if (unicast)
-         {
-            this.destination = in.readInt();            
-         }
-         this.commandId = in.readUTF();
-         int size = in.readInt();
-         if (size == -1)
-         {
-            this.args = null;
-         }
-         else
-         {
-            this.args = new Serializable[size];
-            for (int i = 0; i < size; i++)
-            {
-               args[i] = (Serializable)in.readObject();
-            }
-         }
-      }
-
-      /**
-       * {@inheritDoc}
-       */
-      public void writeExternal(ObjectOutput out) throws IOException
-      {
-         boolean unicast = destination != 0;
-         out.writeBoolean(unicast);
-         if (unicast)
-         {
-            out.writeInt(destination);            
-         }         
-         out.writeUTF(commandId);
-         if (args == null)
-         {
-            out.writeInt(-1);
-         }
-         else
-         {
-            out.writeInt(args.length);
-            for (int i = 0; i < args.length; i++)
-            {
-               out.writeObject(args[i]);
-            }
-         }
-      }
-   }
-
-   /**
-    * All the potential states of the {@link RPCServiceImpl}
-    */
-   public enum State {
-      INITIALIZED, STARTED, STOPPED
-   }
-
-   public static class MemberHasLeftException extends RPCException
-   {
-
-      /**
-       * The serial version UID
-       */
-      private static final long serialVersionUID = 3558158913564367637L;
-
-      public MemberHasLeftException(String message)
-      {
-         super(message);
-      }
-   }
 }

Modified: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	2011-12-06 13:40:17 UTC (rev 5270)
@@ -27,7 +27,7 @@
 import org.exoplatform.services.rpc.SingleMethodCallCommand;
 import org.exoplatform.services.rpc.TopologyChangeEvent;
 import org.exoplatform.services.rpc.TopologyChangeListener;
-import org.exoplatform.services.rpc.impl.RPCServiceImpl.MemberHasLeftException;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
 import org.exoplatform.test.BasicTestCase;
 import org.jgroups.Address;
 
@@ -35,7 +35,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -805,8 +804,8 @@
          o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
          assertEquals("OK", o);
          
-         Vector<Address> allMembers = service1.members;
-         Vector<Address> coordinatorOnly = new Vector<Address>(1);
+         List<Address> allMembers = service1.members;
+         List<Address> coordinatorOnly = new ArrayList<Address>(1);
          coordinatorOnly.add(service1.coordinator);
          
          final RPCServiceImpl service = service2;

Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,115 @@
+<!--
+
+    Copyright (C) 2009 eXo Platform SAS.
+
+    This is free software; you can redistribute it and/or modify it
+    under the terms of the GNU Lesser General Public License as
+    published by the Free Software Foundation; either version 2.1 of
+    the License, or (at your option) any later version.
+
+    This software is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+    Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public
+    License along with this software; if not, write to the Free
+    Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+    02110-1301 USA, or see the FSF site: http://www.fsf.org.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+      <groupId>org.exoplatform.kernel</groupId>
+      <artifactId>kernel-parent</artifactId>
+      <version>2.3.5-GA-SNAPSHOT</version>
+   </parent>
+   <artifactId>exo.kernel.component.ext.rpc.impl.jgroups.v3</artifactId>
+   <name>eXo Kernel :: RPC Service Extension :: JGroups 3 Implementation</name>
+   <description>The JGroups 3 implementation of the RPC service</description>
+   <dependencies>
+      <dependency>
+         <groupId>org.exoplatform.kernel</groupId>
+         <artifactId>exo.kernel.commons.test</artifactId>
+      </dependency>        
+      <dependency>
+         <groupId>org.exoplatform.kernel</groupId>
+         <artifactId>exo.kernel.component.common</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jgroups</groupId>
+         <artifactId>jgroups</artifactId>
+         <version>3.0.0.Final</version>
+      </dependency>
+      <dependency>
+         <groupId>org.slf4j</groupId>
+         <artifactId>slf4j-log4j12</artifactId>
+      </dependency>                   
+   </dependencies>
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+               <argLine>${env.MAVEN_OPTS} -Djava.security.manager=org.exoplatform.commons.test.TestSecurityManager -Djava.security.policy=${project.build.directory}/test-classes/test.policy</argLine>
+                <systemProperties>
+                  <!-- We add this system property due to some incompatibility between IPv6 and 
+                  some JVM of Linux distributions such as Ubuntu and Fedora-->
+                  <property>
+                     <name>java.net.preferIPv4Stack</name>
+                     <value>true</value>
+                  </property>
+                  <!-- Avoid the firewall -->
+                  <property>
+                     <name>jgroups.bind_addr</name>
+                     <value>127.0.0.1</value>
+                  </property>
+                  <property>
+                     <name>jgroups.stack</name>
+                     <value>udp</value>
+                  </property>
+               </systemProperties>
+            </configuration>
+         </plugin>
+         <plugin>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>prepare-test-policy</id>
+                  <phase>process-test-resources</phase>
+                  <configuration>
+                     <tasks>
+                        <echo>Creating Access Policy for tests</echo>
+                        <makeurl file="${settings.localRepository}" property="localRepositoryURL" />
+                        <makeurl file="${project.build.outputDirectory}" property="outputDirectoryURL" />
+                        <makeurl file="${project.build.testOutputDirectory}" property="testOutputDirectoryURL" />
+                        <copy todir="${project.build.testOutputDirectory}" overwrite="true">
+                           <fileset dir="${project.basedir}/src/test/resources/">
+                              <include name="test.policy" />
+                           </fileset>
+                           <filterset>
+                              <filter token="MAVEN_REPO" value="${localRepositoryURL}" />
+                              <filter token="MAIN_CLASSES" value="${outputDirectoryURL}" />
+                              <filter token="TEST_CLASSES" value="${testOutputDirectoryURL}" />
+                           </filterset>
+                        </copy>
+                     </tasks>                     
+                  </configuration>
+                  <goals>
+                     <goal>run</goal>
+                  </goals>
+               </execution>
+            </executions>
+            <dependencies>
+               <dependency>
+                  <groupId>ant</groupId>
+                  <artifactId>ant-optional</artifactId>
+                  <version>1.5.3-1</version>
+               </dependency>
+            </dependencies>
+         </plugin>
+      </plugins>
+   </build>
+</project>

Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.jgv3;
+
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.services.rpc.impl.AbstractRPCService;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.util.RspList;
+
+import java.util.List;
+
+/**
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 3.
+ * 
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class RPCServiceImpl extends AbstractRPCService
+{
+
+   /**
+    * {@inheritDoc}
+    */
+   public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+   {
+      super(ctx, params, configManager);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   protected Address getLocalAddress()
+   {
+      return channel.getAddress();
+   }
+   
+   /**
+    * {@inheritDoc}
+    */
+   protected RspList<Object> castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception
+   {
+      return dispatcher.castMessage(dests, msg, new RequestOptions(synchronous ? ResponseMode.GET_ALL : ResponseMode.GET_NONE, timeout));
+   }
+   
+   /**
+    * {@inheritDoc}
+    */
+   protected Channel createChannel() throws Exception
+   {
+      return new JChannel(configurator);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void unblock()
+   {
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   protected List<Address> getMembers(View view)
+   {
+      return view.getMembers();
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   protected void setObject(Message m, Object o)
+   {
+      m.setObject(o);
+   }
+}

Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1281 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.SingleMethodCallCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
+import org.exoplatform.services.rpc.jgv3.RPCServiceImpl;
+import org.exoplatform.test.BasicTestCase;
+import org.jgroups.Address;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is the unit test class for the service {@link RPCServiceImpl}
+ * 
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ *
+ */
+public class TestRPCServiceImpl extends BasicTestCase
+{
+   private PortalContainer container;
+   private ConfigurationManager configManager;
+   
+   public void setUp() throws Exception
+   {
+      container = PortalContainer.getInstance();
+      configManager = (ConfigurationManager)container.getComponentInstanceOfType(ConfigurationManager.class);
+   }
+   
+   public void testParameters()
+   {
+      InitParams params = null;
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      params = new InitParams();
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      params.addParameter(paramConf);
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      paramConf.setValue("fakePath");
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      paramConf.setValue("jar:/conf/portal/udp.xml");
+      RPCServiceImpl service = null;
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(RPCServiceImpl.DEFAULT_TIMEOUT, service.getDefaultTimeout());
+         assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      ValueParam paramTimeout = new ValueParam();
+      paramTimeout.setName(RPCServiceImpl.PARAM_DEFAULT_TIMEOUT);
+      paramTimeout.setValue("fakeValue");
+      params.addParameter(paramTimeout);
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a NumberFormatException since the timeout is not properly set");
+      }
+      catch (NumberFormatException e)
+      {
+         // OK
+      }
+      paramTimeout.setValue("60");
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      ValueParam paramRetryTimeout = new ValueParam();
+      paramRetryTimeout.setName(RPCServiceImpl.PARAM_RETRY_TIMEOUT);
+      paramRetryTimeout.setValue("fakeValue");
+      params.addParameter(paramRetryTimeout);
+      try
+      {
+         new RPCServiceImpl(container.getContext(), params, configManager);
+         fail("We expect a NumberFormatException since the retry timeout is not properly set");
+      }
+      catch (NumberFormatException e)
+      {
+         // OK
+      }      
+      paramRetryTimeout.setValue("60");
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      ValueParam paramAllowFailover = new ValueParam();
+      paramAllowFailover.setName(RPCServiceImpl.PARAM_ALLOW_FAILOVER);
+      paramAllowFailover.setValue("fakeValue");
+      params.addParameter(paramAllowFailover);
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(false, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      paramAllowFailover.setValue("TRUE");
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(60, service.getRetryTimeout());
+         assertEquals(true, service.isAllowFailover());
+         assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      
+      ValueParam paramClusterName = new ValueParam();      
+      paramClusterName.setName(RPCServiceImpl.PARAM_CLUSTER_NAME);
+      paramClusterName.setValue("MyName");
+      params.addParameter(paramClusterName);
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         assertEquals(60, service.getDefaultTimeout());
+         assertEquals(paramClusterName.getValue() + "-" + container.getContext().getName(), service.getClusterName());
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+   }
+   
+   public void testStates() throws Exception
+   {
+      InitParams params = new InitParams();
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      paramConf.setValue("jar:/conf/portal/udp.xml");      
+      params.addParameter(paramConf);
+      RPCServiceImpl service = null;
+      RemoteCommand foo  = new RemoteCommand()
+      {
+         
+         public String getId()
+         {
+            return "foo";
+         }
+         
+         public String execute(Serializable[] args) throws Throwable
+         {
+            return null;
+         }
+      };
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         
+         service.registerCommand(foo);
+         try
+         {
+            service.executeCommandOnAllNodes(foo, true);
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnAllNodes(foo, 10);
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnCoordinator(foo, true);
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnCoordinator(foo, 10);
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.isCoordinator();
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         service.start();
+         assertEquals(true, service.isCoordinator());
+         service.executeCommandOnAllNodes(foo, true);
+         service.executeCommandOnAllNodes(foo, 10);
+         service.executeCommandOnCoordinator(foo, true);
+         service.executeCommandOnCoordinator(foo, 10);
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }
+      try
+      {
+         service.executeCommandOnAllNodes(foo, true);
+         fail("We expect a RPCException since the current state is not the expected one");
+      }
+      catch (RPCException e)
+      {
+         // OK
+      }
+      try
+      {
+         service.executeCommandOnAllNodes(foo, 10);
+         fail("We expect a RPCException since the current state is not the expected one");
+      }
+      catch (RPCException e)
+      {
+         // OK
+      }
+      try
+      {
+         service.executeCommandOnCoordinator(foo, true);
+         fail("We expect a RPCException since the current state is not the expected one");
+      }
+      catch (RPCException e)
+      {
+         // OK
+      }
+      try
+      {
+         service.executeCommandOnCoordinator(foo, 10);
+         fail("We expect a RPCException since the current state is not the expected one");
+      }
+      catch (RPCException e)
+      {
+         // OK
+      }      
+   }
+   
+   public void testCommands() throws Exception
+   {
+      InitParams params = new InitParams();
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      paramConf.setValue("jar:/conf/portal/udp.xml");      
+      params.addParameter(paramConf);
+      RPCServiceImpl service = null;
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         RemoteCommand fake = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "fake";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return null;
+            }
+         };
+         RemoteCommand fake2 = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "fake2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return null;
+            }
+         };
+         RemoteCommand fake2_Unregistered = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "fake2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return null;
+            }
+         };         
+         service.registerCommand(fake2);
+         RemoteCommand Exception = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "Exception";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               throw new Exception("MyException");
+            }
+         };
+         service.registerCommand(Exception);
+         RemoteCommand Error = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "Error";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               throw new Error("MyError");
+            }
+         } ;
+         service.registerCommand(Error);
+         RemoteCommand StringValue = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "StringValue";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };         
+         service.registerCommand(StringValue);
+         RemoteCommand NullValue = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "NullValue";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return null;
+            }
+         };         
+         service.registerCommand(NullValue);
+         RemoteCommand LongTask = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "LongTask";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               Thread.sleep(2000);
+               return null;
+            }
+         };         
+         service.registerCommand(LongTask);         
+         service.start();
+         try
+         {
+            service.executeCommandOnAllNodes(fake, true);
+            fail("We expect a RPCException since the command is unknown");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnCoordinator(fake, true);
+            fail("We expect a RPCException since the command is unknown");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnAllNodes(fake2_Unregistered, true);
+            fail("We expect a RPCException since the command is unknown");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         try
+         {
+            service.executeCommandOnCoordinator(fake2_Unregistered, true);
+            fail("We expect a RPCException since the command is unknown");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }         
+         List<Object> result;
+         result = service.executeCommandOnAllNodes(Exception, true);
+         assertTrue(result != null && result.size() == 1);
+         assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+         try
+         {
+            service.executeCommandOnCoordinator(Exception, true);
+            fail("We expect a RPCException since one node could not execute the command");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         result = service.executeCommandOnAllNodes(Error, true);
+         assertTrue(result != null && result.size() == 1);
+         assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+         try
+         {
+            service.executeCommandOnCoordinator(Error, true);
+            fail("We expect a RPCException since one node could not execute the command");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         result = service.executeCommandOnAllNodes(LongTask, true);
+         assertNotNull(result);
+         assertTrue(result.size() == 1);
+         assertNull(result.get(0));
+         Object o = service.executeCommandOnCoordinator(LongTask, true);
+         assertNull(o);
+         result = service.executeCommandOnAllNodes(LongTask, 1000);
+         assertNotNull(result);
+         assertTrue(result.size() == 1);
+         assertTrue("We expect an RPCException due to a Replication Timeout", result.get(0) instanceof RPCException);
+         try
+         {
+            service.executeCommandOnCoordinator(LongTask, 1000);
+            fail("We expect an RPCException due to a Replication Timeout");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         result = service.executeCommandOnAllNodes(LongTask, false);
+         assertNotNull(result);
+         assertTrue(result.isEmpty());
+         assertNull(service.executeCommandOnCoordinator(LongTask, false));
+         
+         result = service.executeCommandOnAllNodes(StringValue, true);
+         assertNotNull(result);
+         assertTrue(result.size() == 1);
+         assertEquals("OK", result.get(0));
+         o = service.executeCommandOnCoordinator(StringValue, true);
+         assertEquals("OK", o);
+         result = service.executeCommandOnAllNodes(NullValue, true);
+         assertNotNull(result);
+         assertTrue(result.size() == 1);
+         assertNull(result.get(0));
+         o = service.executeCommandOnCoordinator(NullValue, true);
+         assertNull(o);
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }        
+   }
+
+   public void testSeveralNodes() throws Exception
+   {
+      InitParams params = new InitParams();
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      paramConf.setValue("jar:/conf/portal/udp.xml");      
+      params.addParameter(paramConf);
+      RPCServiceImpl service1 = null, service2 = null;      
+      try
+      {
+         service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+         service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+         RemoteCommand CmdUnknownOnNode2 = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "CmdUnknownOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };
+         service1.registerCommand(CmdUnknownOnNode2);
+         RemoteCommand ExceptionOnNode2 = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "ExceptionOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };         
+         service1.registerCommand(ExceptionOnNode2);
+         RemoteCommand ErrorOnNode2 = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "ErrorOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };           
+         service1.registerCommand(ErrorOnNode2);
+        
+         RemoteCommand LongTaskOnNode2 = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "LongTaskOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };           
+         service1.registerCommand(LongTaskOnNode2); 
+         service1.registerCommand(new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "LongTask";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               Thread.sleep(3000);
+               return "OldCoordinator";
+            }
+         }); 
+         service1.registerCommand(new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "OK";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         });         
+         service2.registerCommand(new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "ExceptionOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               throw new Exception("MyException");
+            }
+         });
+         service2.registerCommand(new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "ErrorOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               throw new Error("MyError");
+            }
+         });
+         service2.registerCommand(new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "LongTaskOnNode2";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               Thread.sleep(2000);
+               return null;
+            }
+         });
+         RemoteCommand OK = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "OK";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "OK";
+            }
+         };
+         service2.registerCommand(OK);
+         final RemoteCommand LongTask = new RemoteCommand()
+         {
+            
+            public String getId()
+            {
+               return "LongTask";
+            }
+            
+            public String execute(Serializable[] args) throws Throwable
+            {
+               return "NewCoordinator";
+            }
+         };
+         service2.registerCommand(LongTask);
+         MyListener listener1 = new MyListener();
+         service1.registerTopologyChangeListener(listener1);
+         MyListener listener2 = new MyListener();
+         service2.registerTopologyChangeListener(listener2);
+         assertFalse(listener1.coordinatorHasChanged);
+         assertFalse(listener1.isCoordinator);
+         assertEquals(0, listener1.count);
+         assertFalse(listener2.coordinatorHasChanged);
+         assertFalse(listener2.isCoordinator);
+         assertEquals(0, listener2.count);
+         service1.start();
+         assertFalse(listener1.coordinatorHasChanged);
+         assertTrue(listener1.isCoordinator);
+         assertEquals(1, listener1.count);
+         assertFalse(listener2.coordinatorHasChanged);
+         assertFalse(listener2.isCoordinator);
+         assertEquals(0, listener2.count);
+         service2.start();
+         assertFalse(listener1.coordinatorHasChanged);
+         assertTrue(listener1.isCoordinator);
+         assertEquals(2, listener1.count);
+         assertFalse(listener2.coordinatorHasChanged);
+         assertFalse(listener2.isCoordinator);
+         assertEquals(1, listener2.count);
+         assertEquals(true, service1.isCoordinator());
+         assertEquals(false, service2.isCoordinator());
+         List<Object> result;
+         Object o;
+         result = service1.executeCommandOnAllNodes(CmdUnknownOnNode2, true);
+         assertTrue(result != null && result.size() == 2);
+         assertEquals("OK", result.get(0));
+         assertTrue("We expect a RPCException since the command is unknown on node 2", result.get(1) instanceof RPCException);
+         o = service1.executeCommandOnCoordinator(CmdUnknownOnNode2, true);
+         assertEquals("OK", o);
+
+         result = service1.executeCommandOnAllNodes(ExceptionOnNode2, true);
+         assertTrue(result != null && result.size() == 2);
+         assertEquals("OK", result.get(0));
+         assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+         o = service1.executeCommandOnCoordinator(ExceptionOnNode2, true);
+         assertEquals("OK", o);
+
+         result = service1.executeCommandOnAllNodes(ErrorOnNode2, true);
+         assertTrue(result != null && result.size() == 2);
+         assertEquals("OK", result.get(0));
+         assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);         
+         o = service1.executeCommandOnCoordinator(ErrorOnNode2, true);
+         assertEquals("OK", o);
+         
+         result = service1.executeCommandOnAllNodes(LongTaskOnNode2, 1000);
+         assertNotNull(result);
+         assertTrue(result.size() == 2);
+         assertEquals("OK", result.get(0));
+         assertTrue("We expect an RPCException due to a Replication Timeout", result.get(1) instanceof RPCException);
+         o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
+         assertEquals("OK", o);
+         
+         List<Address> allMembers = service1.members;
+         List<Address> coordinatorOnly = new ArrayList<Address>(1);
+         coordinatorOnly.add(service1.coordinator);
+         
+         final RPCServiceImpl service = service2;
+         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+         final CountDownLatch doneSignal = new CountDownLatch(1);
+         Thread t = new Thread()
+         {
+            @Override
+            public void run()
+            {
+               try
+               {
+                  Object o = service.executeCommandOnCoordinator(LongTask, true);
+                  assertEquals("NewCoordinator", o);
+               }
+               catch (Throwable e)
+               {
+                  error.set(e);
+                  e.printStackTrace();
+               }
+               finally
+               {
+                  doneSignal.countDown();
+               }
+            }           
+         };
+         t.start();
+         service1.stop();
+         listener2.waitTopologyChange();
+         assertFalse(listener1.coordinatorHasChanged);
+         assertTrue(listener1.isCoordinator);
+         assertEquals(2, listener1.count);
+         assertTrue(listener2.coordinatorHasChanged);
+         assertTrue(listener2.isCoordinator);
+         assertEquals(2, listener2.count);         
+         doneSignal.await();
+         assertNull(error.get() != null ? error.get().getMessage() : "", error.get());
+         result = service2.excecuteCommand(allMembers, OK, true, 0);
+         assertNotNull(result);
+         assertTrue(result.size() == 2);
+         assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+         assertEquals("OK", result.get(1));
+         result = service2.excecuteCommand(coordinatorOnly, OK, true, 0);
+         assertNotNull(result);
+         assertTrue(result.size() == 1);
+         assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+         try
+         {
+            service1.isCoordinator();
+            fail("We expect a RPCException since the current state is not the expected one");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }
+         assertEquals(true, service2.isCoordinator());         
+      }
+      finally
+      {
+         if (service1 != null)
+         {
+            service1.stop();            
+         }
+         if (service2 != null)
+         {
+            service2.stop();            
+         }
+      }        
+   }
+   
+   public void testSingleMethodCallCommand() throws Exception
+   {
+      try
+      {
+         new SingleMethodCallCommand(null, null);
+         fail("we expect an IllegalArgumentException");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      MyService myService = new MyService();
+      try
+      {
+         new SingleMethodCallCommand(myService, null);
+         fail("we expect an IllegalArgumentException");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      try
+      {
+         new SingleMethodCallCommand(myService, "foo");
+         fail("we expect an NoSuchMethodException");
+      }
+      catch (NoSuchMethodException e)
+      {
+         // OK
+      }
+      try
+      {
+         new SingleMethodCallCommand(myService, "getPrivateName");
+         fail("we expect an IllegalArgumentException since only the public methods are allowed");
+      }
+      catch (IllegalArgumentException e)
+      {
+         // OK
+      }
+      InitParams params = new InitParams();
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      paramConf.setValue("jar:/conf/portal/udp.xml");      
+      params.addParameter(paramConf);
+      RPCServiceImpl service = null;
+      try
+      {
+         service = new RPCServiceImpl(container.getContext(), params, configManager);
+         RemoteCommand getName = service.registerCommand(new SingleMethodCallCommand(myService, "getName"));
+         RemoteCommand add = service.registerCommand(new SingleMethodCallCommand(myService, "add", int.class));
+         RemoteCommand evaluate1 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", int[].class));
+         RemoteCommand evaluate2 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", List.class));
+         RemoteCommand total1 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class));
+         RemoteCommand total2 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class, int.class));
+         RemoteCommand total3 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int[].class));
+         RemoteCommand total4 = service.registerCommand(new SingleMethodCallCommand(myService, "total", String.class, long.class, int[].class));
+         RemoteCommand testTypes1 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", String[].class));
+         RemoteCommand testTypes2 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", int[].class));
+         RemoteCommand testTypes3 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", long[].class));
+         RemoteCommand testTypes4 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", byte[].class));
+         RemoteCommand testTypes5 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", short[].class));
+         RemoteCommand testTypes6 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", char[].class));
+         RemoteCommand testTypes7 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", double[].class));
+         RemoteCommand testTypes8 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", float[].class));
+         RemoteCommand testTypes9 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", boolean[].class));
+
+         service.start();
+         List<Object> result;
+         
+         assertEquals("name", service.executeCommandOnCoordinator(getName, true));
+         result = service.executeCommandOnAllNodes(getName, true);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals("name", result.get(0));
+         
+         assertEquals(10, service.executeCommandOnCoordinator(add, true, 10));
+         result = service.executeCommandOnAllNodes(add, true, 10);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(20, result.get(0));
+         
+         assertEquals(100, service.executeCommandOnCoordinator(evaluate1, true, new int[]{10, 10, 10, 30, 40}));
+         result = service.executeCommandOnAllNodes(evaluate1, true, new int[]{10, 10, 10, 30, 40});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(100, result.get(0));
+         
+         List<Integer> values = new ArrayList<Integer>();
+         values.add(10);
+         values.add(10);
+         values.add(10);
+         values.add(30);
+         values.add(40);
+         assertEquals(100, service.executeCommandOnCoordinator(evaluate2, true, (Serializable)values));
+         result = service.executeCommandOnAllNodes(evaluate2, true, (Serializable)values);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(100, result.get(0));
+         
+         assertEquals(10, service.executeCommandOnCoordinator(total1, true, 10));
+         result = service.executeCommandOnAllNodes(total1, true, 10);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(10, result.get(0));
+         
+         assertEquals(20, service.executeCommandOnCoordinator(total2, true, 10, 10));
+         result = service.executeCommandOnAllNodes(total2, true, 10, 10);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(20, result.get(0));
+         
+         assertEquals(100, service.executeCommandOnCoordinator(total3, true, new int[]{10, 10, 10, 30, 40}));
+         result = service.executeCommandOnAllNodes(total3, true, new int[]{10, 10, 10, 30, 40});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(100, result.get(0));
+         
+         assertEquals(100, service.executeCommandOnCoordinator(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40}));
+         result = service.executeCommandOnAllNodes(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(100, result.get(0));
+         
+         assertEquals(0, service.executeCommandOnCoordinator(total4, true, "foo", 50, null));
+         result = service.executeCommandOnAllNodes(total4, true, "foo", 50, null);
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(0, result.get(0));
+         
+         try
+         {
+            service.executeCommandOnCoordinator(total4, true, "foo", 50);
+            fail("We expect a RPCException since the list of arguments mismatch with what is expected");
+         }
+         catch (RPCException e)
+         {
+            // OK
+         }         
+         result = service.executeCommandOnAllNodes(total4, true, "foo", 50);
+         assertTrue(result != null && result.size() == 1);
+         assertTrue("We expect a RPCException since the list of arguments mismatch with what is expected", result.get(0) instanceof RPCException);
+         
+         assertEquals("foo", service.executeCommandOnCoordinator(testTypes1, true, (Serializable)new String[]{"foo"}));
+         result = service.executeCommandOnAllNodes(testTypes1, true, (Serializable)new String[]{"foo"});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals("foo", result.get(0));
+         
+         assertEquals(10, service.executeCommandOnCoordinator(testTypes2, true, new int[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes2, true, new int[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(10, result.get(0));
+         
+         assertEquals(11L, service.executeCommandOnCoordinator(testTypes3, true, new long[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes3, true, new long[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(11L, result.get(0));
+         
+         assertEquals((byte)12, service.executeCommandOnCoordinator(testTypes4, true, new byte[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes4, true, new byte[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals((byte)12, result.get(0));
+         
+         assertEquals((short)13, service.executeCommandOnCoordinator(testTypes5, true, new short[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes5, true, new short[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals((short)13, result.get(0));
+         
+         assertEquals('a', service.executeCommandOnCoordinator(testTypes6, true, new char[]{'a'}));
+         result = service.executeCommandOnAllNodes(testTypes6, true, new char[]{'a'});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals('a', result.get(0));
+         
+         assertEquals(10.5, service.executeCommandOnCoordinator(testTypes7, true, new double[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes7, true, new double[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(10.5, result.get(0));
+         
+         assertEquals((float)11.5, service.executeCommandOnCoordinator(testTypes8, true, new float[]{10}));
+         result = service.executeCommandOnAllNodes(testTypes8, true, new float[]{10});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals((float)11.5, result.get(0));
+         
+         assertEquals(true, service.executeCommandOnCoordinator(testTypes9, true, new boolean[]{true}));
+         result = service.executeCommandOnAllNodes(testTypes9, true, new boolean[]{true});
+         assertTrue(result != null && result.size() == 1);
+         assertEquals(true, result.get(0));
+         
+      }
+      finally
+      {
+         if (service != null)
+         {
+            service.stop();            
+         }
+      }         
+   }
+   
+   public static class MyService
+   {
+      private int value = 0;
+      
+      public int add(int i)
+      {
+         return value += i;
+      }
+      
+      @SuppressWarnings("unused")
+      private String getPrivateName()
+      {
+         return "name";
+      }
+      
+      public String getName()
+      {
+         return "name";
+      }
+      
+      public int total(int i)
+      {
+         return i;
+      }
+      
+      public int total(int i1, int i2)
+      {
+         return i1 + i2;
+      }
+      
+      public int total(int... values)
+      {
+         int total = 0;
+         for (int i : values)
+         {
+            total += i;
+         }
+         return total;
+      }
+      
+      public int total(String s, long l, int... values)
+      {
+         int total = 0;
+         if (values != null)
+         {
+            for (int i : values)
+            {
+               total += i;
+            }            
+         }
+         return total;
+      }
+      
+      public int evaluate(int[] values)
+      {
+         int total = 0;
+         for (int i : values)
+         {
+            total += i;
+         }
+         return total;         
+      }
+      
+      public int evaluate(List<Integer> values)
+      {
+         int total = 0;
+         for (int i : values)
+         {
+            total += i;
+         }
+         return total;         
+      }
+      
+      public String testTypes(String... values)
+      {
+         return values[0];
+      }
+      
+      public boolean testTypes(boolean... values)
+      {
+         return values[0];
+      }
+      
+      public char testTypes(char... values)
+      {
+         return values[0];
+      }
+      
+      public double testTypes(double... values)
+      {
+         return values[0] + 0.5;
+      }
+      
+      public float testTypes(float... values)
+      {
+         return (float)(values[0] + 1.5);
+      }
+      
+      public int testTypes(int... values)
+      {
+         return values[0];
+      }
+      
+      public long testTypes(long... values)
+      {
+         return values[0] + 1;
+      }
+      
+      public byte testTypes(byte... values)
+      {
+         return (byte)(values[0] + 2);
+      }
+      
+      public short testTypes(short... values)
+      {
+         return (short)(values[0] + 3);
+      }
+   }
+   
+   public void testExecOnCoordinator() throws Exception
+   {
+      InitParams params = new InitParams();
+      ValueParam paramConf = new ValueParam();
+      paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+      paramConf.setValue("jar:/conf/portal/udp.xml");
+      params.addParameter(paramConf);
+
+      final List<Boolean> calledCommands = Collections.synchronizedList(new ArrayList<Boolean>());
+
+      RPCServiceImpl service1 = null;
+      RPCServiceImpl service2 = null;
+      try
+      {
+         service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+         RemoteCommand service1Cmd = new RemoteCommand()
+         {
+            public String getId()
+            {
+               return "CoordinatorExecutedCommand";
+            }
+
+            public String execute(Serializable[] args) throws Throwable
+            {
+               calledCommands.add(Boolean.TRUE);
+               return "service 1";
+            }
+         };
+         service1.registerCommand(service1Cmd);
+
+         service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+         RemoteCommand service2Cmd = new RemoteCommand()
+         {
+            public String getId()
+            {
+               return "CoordinatorExecutedCommand";
+            }
+
+            public String execute(Serializable[] args) throws Throwable
+            {
+               calledCommands.add(Boolean.TRUE);
+               return "service 2";
+            }
+         };
+         service2.registerCommand(service2Cmd);
+         // starting services
+         service1.start();
+         service2.start();
+
+         Object o = service1.executeCommandOnCoordinator(service1Cmd, true);
+         assertEquals("service 1", o);
+
+         // it should be executed once
+         assertEquals(1, calledCommands.size());
+      }
+      finally
+      {
+         if (service1 != null)
+         {
+            service1.stop();
+         }
+         if (service2 != null)
+         {
+            service2.stop();
+         }
+      }
+   }
+   
+   private static class MyListener implements TopologyChangeListener
+   {
+
+      private boolean coordinatorHasChanged;
+      private boolean isCoordinator;
+      private int count;
+
+      private CountDownLatch lock = new CountDownLatch(2);
+      
+      /**
+       * @see org.exoplatform.services.rpc.TopologyChangeListener#onChange(org.exoplatform.services.rpc.TopologyChangeEvent)
+       */
+      public void onChange(TopologyChangeEvent event)
+      {
+         this.coordinatorHasChanged = event.isCoordinatorHasChanged();
+         this.isCoordinator = event.isCoordinator();
+         count++;
+         
+         lock.countDown();
+      }
+
+      public void waitTopologyChange() throws InterruptedException
+      {
+         lock.await();
+      }
+   }
+}

Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,67 @@
+<config xmlns="urn:org:jgroups"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
+    <UDP
+         mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
+         mcast_port="${jgroups.udp.mcast_port:45588}"
+         tos="8"
+         ucast_recv_buf_size="20M"
+         ucast_send_buf_size="640K"
+         mcast_recv_buf_size="25M"
+         mcast_send_buf_size="640K"
+         loopback="true"
+         discard_incompatible_packets="true"
+         max_bundle_size="64K"
+         max_bundle_timeout="30"
+         ip_ttl="${jgroups.udp.ip_ttl:8}"
+         enable_bundling="true"
+         enable_diagnostics="true"
+         thread_naming_pattern="cl"
+
+         timer_type="new"
+         timer.min_threads="4"
+         timer.max_threads="10"
+         timer.keep_alive_time="3000"
+         timer.queue_max_size="500"
+
+         thread_pool.enabled="true"
+         thread_pool.min_threads="2"
+         thread_pool.max_threads="8"
+         thread_pool.keep_alive_time="5000"
+         thread_pool.queue_enabled="true"
+         thread_pool.queue_max_size="10000"
+         thread_pool.rejection_policy="discard"
+
+         oob_thread_pool.enabled="true"
+         oob_thread_pool.min_threads="1"
+         oob_thread_pool.max_threads="8"
+         oob_thread_pool.keep_alive_time="5000"
+         oob_thread_pool.queue_enabled="false"
+         oob_thread_pool.queue_max_size="100"
+         oob_thread_pool.rejection_policy="Run"/>
+
+    <PING timeout="2000"
+            num_initial_members="3"/>
+    <MERGE2 max_interval="30000"
+            min_interval="10000"/>
+    <FD_SOCK/>
+    <FD_ALL/>
+    <VERIFY_SUSPECT timeout="1500"  />
+    <BARRIER />
+    <pbcast.NAKACK exponential_backoff="300"
+                   xmit_stagger_timeout="200"
+                   use_mcast_xmit="false"
+                   discard_delivered_msgs="true"/>
+    <UNICAST />
+    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="4M"/>
+    <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                view_bundling="true"/>
+    <UFC max_credits="2M"
+         min_threshold="0.4"/>
+    <MFC max_credits="2M"
+         min_threshold="0.4"/>
+    <FRAG2 frag_size="60K"  />
+    <pbcast.STATE_TRANSFER />
+    <!-- pbcast.FLUSH  /-->
+</config>
\ No newline at end of file

Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy	2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,35 @@
+grant codeBase "@MAVEN_REPO at -"{
+   permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at -"{
+   permission java.security.AllPermission;
+};
+
+grant codeBase "@TEST_CLASSES at -"{
+   permission java.lang.RuntimePermission "accessRPCService";
+};
+
+grant codeBase "@TEST_CLASSES at -"{
+   permission java.lang.RuntimePermission "manageContainer";
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.component.common/-"{
+   permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.commons.test/-"{
+   permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.commons/-"{
+   permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES at ../../../exo.kernel.container/-"{
+   permission java.security.AllPermission;
+};
+
+
+
+

Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml	2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/pom.xml	2011-12-06 13:40:17 UTC (rev 5270)
@@ -53,6 +53,7 @@
       <module>exo.kernel.commons</module>
       <module>exo.kernel.commons.test</module>
       <module>exo.kernel.component.common</module>
+      <module>exo.kernel.component.ext.rpc.impl.jgroups.v3</module>
       <module>exo.kernel.component.cache</module>
       <module>exo.kernel.component.ext.cache.impl.jboss.v3</module>
       <module>exo.kernel.component.ext.cache.impl.infinispan.v5</module>
@@ -86,7 +87,7 @@
          </dependency>
          <dependency>
             <groupId>org.exoplatform.kernel</groupId>
-            <artifactId>exo.kernel.component.remote</artifactId>
+            <artifactId>exo.kernel.component.common</artifactId>
             <version>${project.version}</version>
          </dependency>
          <dependency>



More information about the exo-jcr-commits mailing list