[exo-jcr-commits] exo-jcr SVN: r3252 - in kernel/trunk/exo.kernel.component.common: src/main/java/org/exoplatform/services and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 6 10:10:45 EDT 2010
Author: nfilotto
Date: 2010-10-06 10:10:44 -0400 (Wed, 06 Oct 2010)
New Revision: 3252
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCException.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCService.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RemoteCommand.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/SingleMethodCallCommand.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
kernel/trunk/exo.kernel.component.common/src/test/resources/conf/portal/udp.xml
Modified:
kernel/trunk/exo.kernel.component.common/pom.xml
Log:
EXOJCR-967: Implementation of the RPCService
Modified: kernel/trunk/exo.kernel.component.common/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.common/pom.xml 2010-10-06 09:31:29 UTC (rev 3251)
+++ kernel/trunk/exo.kernel.component.common/pom.xml 2010-10-06 14:10:44 UTC (rev 3252)
@@ -93,6 +93,10 @@
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
</dependency>
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCException.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCException.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCException.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc;
+
+/**
+ * The root class of all the Exception related to the RPC Service
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class RPCException extends Exception
+{
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = -9113831373947878170L;
+
+ public RPCException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public RPCException(String message)
+ {
+ super(message);
+ }
+}
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCService.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCService.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RPCService.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,127 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This service provides mechanism to communicate with the other cluster nodes. This service will
+ * be based of JGroups as an underlying Transport.
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public interface RPCService
+{
+
+ /**
+ * The permission needed to access to any methods of the RPCService
+ */
+ public static final RuntimePermission ACCESS_RPC_SERVICE_PERMISSION = new RuntimePermission("accessRPCService");
+
+ /**
+ * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on each cluster node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
+ * the RPCException will be the corresponding response of this particular node
+ * @throws RPCException in the event of problems.
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException, SecurityException;
+
+ /**
+ * Executes a command synchronously on all the cluster nodes. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on each cluster node
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
+ * the RPCException will be the corresponding response of this particular node
+ * @throws RPCException in the event of problems.
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException, SecurityException;
+
+ /**
+ * Executes a command on the coordinator only. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on the coordinator node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return the response of the coordinator.
+ * @throws RPCException in the event of problems.
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException, SecurityException;
+
+ /**
+ * Executes a command synchronously on the coordinator only. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on the coordinator node
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return the response of the coordinator.
+ * @throws RPCException in the event of problems.
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args) throws RPCException,
+ SecurityException;
+
+ /**
+ * Register a new {@link RemoteCommand} instance, it will be mapped to its id. If a command with the
+ * same Id has already been registered, a warning will be printed into the log file and the new
+ * command will replace the old one.
+ * @param command the instance of the {@link RemoteCommand} to register
+ * @return the command itself if it could be registered null otherwise
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ RemoteCommand registerCommand(RemoteCommand command) throws SecurityException;
+
+ /**
+ * Unregister a {@link RemoteCommand} instance, if the id is known or the instance itself is known
+ * otherwise it will be ignored
+ * @param command the command to unregister
+ * @throws SecurityException if the {@link SecurityManager} is installed and the call method
+ * doesn't have the {@link RuntimePermission} <code>ACCESS_RPC_SERVICE_PERMISSION</code>
+ */
+ void unregisterCommand(RemoteCommand command) throws SecurityException;
+}
\ No newline at end of file
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RemoteCommand.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RemoteCommand.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/RemoteCommand.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc;
+
+import java.io.Serializable;
+
+/**
+ * This class represents the command that can be executed on a remote server.
+ * A RemoteCommand needs to be ThreadSafe since it can be re-used by several
+ * threads in parallel.
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public interface RemoteCommand
+{
+ /**
+ * This method will execute the command on the local machine.
+ * @param container The container from which the services needed for the command
+ * will be extracted
+ * @param args The parameters needed to execute the command
+ * @return arbitrary return value generated by performing this command
+ * @throws Throwable in the event of problems.
+ */
+ Serializable execute(Serializable[] args) throws Throwable;
+
+ /**
+ * Gives the id of the command
+ * @return the unique ID of the command
+ */
+ String getId();
+}
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/SingleMethodCallCommand.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/SingleMethodCallCommand.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/SingleMethodCallCommand.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+
+/**
+ * This command will allow you to call one specific method with the arguments given by the execute method
+ * on a component.
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class SingleMethodCallCommand implements RemoteCommand
+{
+
+ /**
+ * The component on which we want to execute the method
+ */
+ private final Object component;
+
+ /**
+ * The method that we want to call
+ */
+ private final Method method;
+
+ /**
+ * The id of the command
+ */
+ private final String id;
+
+ /**
+ *
+ * @param component the component on which we want to execute the method
+ * @param methodName the name of the method
+ * @param parameterTypes the parameter array
+ * @throws NoSuchMethodException if a matching method is not found.
+ * @exception SecurityException
+ * If a security manager, <i>s</i>, is present and any of the
+ * following conditions is met:
+ *
+ * <ul>
+ *
+ * <li> invocation of
+ * <tt>{@link SecurityManager#checkMemberAccess
+ * s.checkMemberAccess(this, Member.DECLARED)}</tt> denies
+ * access to the declared method
+ *
+ * <li> the caller's class loader is not the same as or an
+ * ancestor of the class loader for the current class and
+ * invocation of <tt>{@link SecurityManager#checkPackageAccess
+ * s.checkPackageAccess()}</tt> denies access to the package
+ * of this class
+ *
+ * </ul>
+ * @throws ClassNotFoundException If the last parameter type is an array and we
+ * cannot find the type of the array
+ */
+ public SingleMethodCallCommand(Object component, String methodName, Class<?>... parameterTypes)
+ throws SecurityException, NoSuchMethodException, ClassNotFoundException
+ {
+ if (component == null)
+ {
+ throw new IllegalArgumentException("The component cannot be null");
+ }
+ if (methodName == null || (methodName = methodName.trim()).length() == 0)
+ {
+ throw new IllegalArgumentException("The methodName cannot be empty");
+ }
+ this.component = component;
+ this.method = component.getClass().getDeclaredMethod(methodName, parameterTypes);
+ if (!Modifier.isPublic(method.getModifiers()))
+ {
+ throw new IllegalArgumentException("The method '" + methodName + "' is not public");
+ }
+ this.id = getId(component, method);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Serializable execute(Serializable[] args) throws Throwable
+ {
+ try
+ {
+ return (Serializable)method.invoke(component, (Object[])args);
+ }
+ catch (Exception e)
+ {
+ throw new Exception("Could not execute the method " + id + " with the arguments " + Arrays.toString(args), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getId()
+ {
+ return id;
+ }
+
+ /**
+ * Gives a unique Id from the component and the method
+ */
+ private static String getId(Object component, Method method)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(component.getClass().getName());
+ sb.append('.');
+ sb.append(method.getName());
+ sb.append('(');
+ boolean first = true;
+ for (Class<?> c : method.getParameterTypes())
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ sb.append(',');
+ }
+ sb.append(c.getSimpleName());
+ }
+ sb.append(')');
+ return sb.toString();
+ }
+}
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,788 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.commons.utils.PropertyManager;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RPCService;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.JChannel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.picocontainer.Startable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
+ * {@link MessageDispatcher}. This implementation is not designed to give the best possible
+ * performances, it only aims to give a way to communicate with other nodes.
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class RPCServiceImpl implements RPCService, Startable, RequestHandler, MembershipListener
+{
+
+ /**
+ * Connection logger.
+ */
+ private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
+
+ /**
+ * The name of the parameter for the location of the JGroups configuration.
+ */
+ public static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
+
+ /**
+ * The name of the parameter for the name of the cluster.
+ */
+ public static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
+
+ /**
+ * The name of the parameter for the default timeout
+ */
+ public static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
+
+ /**
+ * The value of the default timeout
+ */
+ public static final int DEFAULT_TIMEOUT = 0;
+
+ /**
+ * The default value of the cluster name
+ */
+ public static final String CLUSTER_NAME = "RPCService-Cluster";
+
+ /**
+ * The configurator used to create the JGroups Channel
+ */
+ private final ProtocolStackConfigurator configurator;
+
+ /**
+ * The name of the cluster
+ */
+ private final String clusterName;
+
+ /**
+ * The JGroups Channel used to communicate with other nodes
+ */
+ protected Channel channel;
+
+ /**
+ * The current list of all the members of the cluster
+ */
+ protected volatile Vector<Address> members;
+
+ /**
+ * The address of the current coordinator
+ */
+ protected volatile Address coordinator;
+
+ /**
+ * The default value of the timeout
+ */
+ private long defaultTimeout = DEFAULT_TIMEOUT;
+
+ /**
+ * The dispatcher used to launch the command of the cluster nodes
+ */
+ private MessageDispatcher dispatcher;
+
+ /**
+ * The signal that indicates that the service is started, it will be used
+ * to make the application wait until the service is fully started to
+ * ensure that all the commands have been registered before handling
+ * incoming messages.
+ */
+ private final CountDownLatch startSignal = new CountDownLatch(1);
+
+ /**
+ * Current State of the {@link RPCServiceImpl}
+ */
+ private volatile State state;
+
+ /**
+ * All the commands that have been registered
+ */
+ private volatile Map<String, RemoteCommand> commands =
+ Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
+
+ /**
+ * The public constructor
+ * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
+ * {@link ExoContainer}
+ * @param params the list of initial parameters
+ * @param configManager the configuration manager used to get the configuration
+ * of JGroups
+ */
+ public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+ {
+ if (params == null)
+ {
+ throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
+ }
+ URL properties = getProperties(params, configManager);
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
+ }
+ try
+ {
+ this.configurator = ConfiguratorFactory.getStackConfigurator(properties);
+ }
+ catch (ChannelException e)
+ {
+ throw new RuntimeException("Cannot load the JGroups configuration from " + properties, e);
+ }
+ this.clusterName = getClusterName(ctx, params);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
+ }
+ String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
+ if (sTimeout != null)
+ {
+ defaultTimeout = Integer.parseInt(sTimeout);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
+ }
+ }
+ this.state = State.INITIALIZED;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on each cluster node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
+ * the RPCException will be the corresponding response of this particular node
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ return excecuteCommand(members, command, synchronous, timeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on the coordinator only. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on the coordinator node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return the response of the coordinator.
+ * @throws RPCException in the event of problems.
+ */
+ protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ Address coordinator = this.coordinator;
+ Vector<Address> v = new Vector<Address>(1);
+ v.add(coordinator);
+ List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
+ Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
+ if (result instanceof MemberHasLeftException)
+ {
+ if (coordinator.equals(this.coordinator))
+ {
+ throw new RPCException("The coordinator did not change, we faced an unexpected situation",
+ (MemberHasLeftException)result);
+ }
+ else
+ {
+ // The coordinator has changed, we will automatically retry with the new coordinator
+ return executeCommandOnCoordinator(command, synchronous, timeout, args);
+ }
+ }
+ else if (result instanceof RPCException)
+ {
+ throw (RPCException)result;
+ }
+ return result;
+ }
+
+ /**
+ * Execute the command on all the nodes corresponding to the list of destinations.
+ * @param dests the list of members on which the command needs to be executed
+ * @param command the command to execute
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets
+ * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args the list of parameters
+ * @return a list of responses from all the targeted members of the cluster.
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> excecuteCommand(final Vector<Address> dests, RemoteCommand command,
+ final boolean synchronous, final long timeout, Serializable... args) throws RPCException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (state != State.STARTED)
+ {
+ throw new RPCException(
+ "Cannot execute any commands if the service is not started, the current state of the service is " + state);
+ }
+ String commandId = command.getId();
+ if (commands.get(commandId) != command)
+ {
+ throw new RPCException("Command " + commandId + " unknown, please register your command first");
+ }
+ final Message msg = new Message();
+ msg.setObject(new MessageBody(commandId, args));
+ RspList rsps = AccessController.doPrivileged(new PrivilegedAction<RspList>()
+ {
+ public RspList run()
+ {
+ return dispatcher.castMessage(dests, msg, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE,
+ timeout);
+ }
+ });
+
+ if (LOG.isTraceEnabled())
+ LOG.trace("responses: " + rsps);
+ if (rsps == null)
+ throw new RPCException("Could not get the responses for command " + commandId + ".");
+ if (!synchronous)
+ return Collections.emptyList();// async case
+ if (LOG.isTraceEnabled())
+ {
+ LOG.trace("(" + channel.getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
+ }
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+ for (Address dest : dests)
+ {
+ Rsp rsp = rsps.get(dest);
+ if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
+ {
+ // The corresponding member has left
+ retval.add(new MemberHasLeftException("No response for the member " + dest
+ + ", this member has probably left the cluster."));
+ }
+ else if (!rsp.wasReceived())
+ {
+ retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
+ }
+ else
+ {
+ Object value = rsp.getValue();
+ if (value instanceof RPCException)
+ {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (LOG.isTraceEnabled())
+ LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
+ }
+ retval.add(value);
+ }
+ }
+ return retval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object handle(Message msg)
+ {
+ String commandId = null;
+ try
+ {
+ // Ensure that the service is fully started before trying to execute any command
+ startSignal.await();
+ MessageBody body = (MessageBody)msg.getObject();
+ RemoteCommand command = getCommand(commandId = body.getCommandId());
+ if (command == null)
+ {
+ return new RPCException("Command " + commandId + " unkown, please register your command first");
+ }
+ Object execResult = command.execute(body.getArgs());
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
+ return execResult;
+ }
+ catch (Throwable x)
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Problems invoking command.", x);
+ return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void block()
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void suspect(Address suspectedMbr)
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void viewAccepted(View view)
+ {
+ this.members = view.getMembers();
+ this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized RemoteCommand registerCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ RemoteCommand oldCommand = tmpCommands.put(commandId, command);
+ if (oldCommand != null && PropertyManager.isDevelopping())
+ {
+ LOG.warn("A command has already been registered with the id " + commandId
+ + ", this command will be replaced with the new one");
+ }
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ return command;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized void unregisterCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ if (commands.get(commandId) != command)
+ {
+ // We prevent to remove any command that has not been registered, thus we expect that
+ // the registered instance is exactly the same instance as the one that we want to
+ // unregister
+ if (PropertyManager.isDevelopping())
+ {
+ LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
+ + " is unknown or the instance of RemoteCommand to unregister is unknown");
+ }
+ return;
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ tmpCommands.remove(commandId);
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ }
+ }
+
+ /**
+ * Gives the {@link RemoteCommand} corresponding to the given id
+ * @param commandId the command id of the command to retrieve
+ * @return the corresponding {@link RemoteCommand}
+ */
+ protected RemoteCommand getCommand(String commandId)
+ {
+ return commands.get(commandId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void start()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ try
+ {
+ this.channel = new JChannel(configurator);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ this.dispatcher = new MessageDispatcher(channel, null, this, this);
+ doPriviledgedExceptionAction(new PrivilegedExceptionAction<Void>()
+ {
+ public Void run() throws Exception
+ {
+ channel.connect(clusterName);
+ return null;
+ }
+ });
+ }
+ catch (ChannelException e)
+ {
+ throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", e);
+ }
+ finally
+ {
+ this.state = State.STARTED;
+ startSignal.countDown();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void stop()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ this.state = State.STOPPED;
+ if (channel != null && channel.isOpen())
+ {
+ if (LOG.isInfoEnabled())
+ LOG.info("Disconnecting and closing the Channel");
+ AccessController.doPrivileged(new PrivilegedAction<Void>()
+ {
+ public Void run()
+ {
+ channel.disconnect();
+ return null;
+ }
+ });
+ channel.close();
+ channel = null;
+ }
+ if (dispatcher != null)
+ {
+ dispatcher.stop();
+ dispatcher = null;
+ }
+ }
+
+ /**
+ * Gives the value of the default timeout
+ * @return the default timeout
+ */
+ public long getDefaultTimeout()
+ {
+ return defaultTimeout;
+ }
+
+ /**
+ * Gives the name of the cluster
+ * @return the name of the cluster
+ */
+ public String getClusterName()
+ {
+ return clusterName;
+ }
+
+ /**
+ * Gives the value of the {@link ValueParam} corresponding to the given key
+ * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
+ * @param parameterKey the name of the {@link ValueParam} that we are looking for
+ * @return the value if it exists, null otherwise
+ */
+ private static String getValueParam(InitParams params, String parameterKey)
+ {
+ try
+ {
+ return params.getValueParam(parameterKey).getValue().trim();
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gives the {@link URL} corresponding to the location of the JGroups configuration
+ * @param params the initial parameters from which we extract the parameter
+ * <code>PARAM_JGROUPS_CONFIG</code>
+ * @param configManager the configuration manager used to get the {@link URL} corresponding
+ * to the path given in the configuration of the RPCServiceImpl
+ * @return The {@link URL} corresponding to the location of the JGroups configuration,
+ * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
+ */
+ private static URL getProperties(InitParams params, ConfigurationManager configManager)
+ {
+ String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
+ if (configPath == null)
+ {
+ throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
+ + "' of RPCServiceImpl is mandatory");
+ }
+ URL properties;
+ try
+ {
+ properties = configManager.getResource(configPath);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
+ }
+ if (properties == null)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
+ }
+ return properties;
+ }
+
+ /**
+ * Gives the name of the cluster that will be able to support several portal containers
+ * since the name will be post fixed with "-${container-name}"
+ * @param ctx the context from which we extract the name of the container
+ * @param params the list of initial parameters from which we get the value of the parameter
+ * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
+ */
+ private static String getClusterName(ExoContainerContext ctx, InitParams params)
+ {
+ String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
+ if (clusterName == null)
+ {
+ clusterName = CLUSTER_NAME;
+ }
+ return clusterName += "-" + ctx.getName();
+ }
+
+ /**
+ * Execute a privilege action
+ */
+ private static <E> E doPriviledgedExceptionAction(PrivilegedExceptionAction<E> action) throws ChannelException
+ {
+ try
+ {
+ return AccessController.doPrivileged(action);
+ }
+ catch (PrivilegedActionException pae)
+ {
+ Throwable cause = pae.getCause();
+ if (cause instanceof ChannelException)
+ {
+ throw (ChannelException)cause;
+ }
+ else if (cause instanceof RuntimeException)
+ {
+ throw (RuntimeException)cause;
+ }
+ else
+ {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ /**
+ * This intern class will be used to
+ */
+ public static class MessageBody implements Externalizable
+ {
+ /**
+ * The Id of the command to execute
+ */
+ private String commandId;
+
+ /**
+ * The list of parameters;
+ */
+ private Serializable[] args;
+
+ public MessageBody()
+ {
+ }
+
+ public MessageBody(String commandId, Serializable[] args)
+ {
+ this.commandId = commandId;
+ this.args = args;
+ }
+
+ public String getCommandId()
+ {
+ return commandId;
+ }
+
+ public Serializable[] getArgs()
+ {
+ return args;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ this.commandId = in.readUTF();
+ int size = in.readInt();
+ if (size == -1)
+ {
+ this.args = null;
+ }
+ else
+ {
+ this.args = new Serializable[size];
+ for (int i = 0; i < size; i++)
+ {
+ args[i] = (Serializable)in.readObject();
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeUTF(commandId);
+ if (args == null)
+ {
+ out.writeInt(-1);
+ }
+ else
+ {
+ out.writeInt(args.length);
+ for (int i = 0; i < args.length; i++)
+ {
+ out.writeObject(args[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * All the potential states of the {@link RPCServiceImpl}
+ */
+ public enum State {
+ INITIALIZED, STARTED, STOPPED
+ }
+
+ public static class MemberHasLeftException extends RPCException
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 3558158913564367637L;
+
+ public MemberHasLeftException(String message)
+ {
+ super(message);
+ }
+ }
+}
Added: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,1062 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.SingleMethodCallCommand;
+import org.exoplatform.services.rpc.impl.RPCServiceImpl.MemberHasLeftException;
+import org.exoplatform.test.BasicTestCase;
+import org.jgroups.Address;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is the unit test class for the service {@link RPCServiceImpl}
+ *
+ * @author <a href="mailto:nicolas.filotto at exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ *
+ */
+public class TestRPCServiceImpl extends BasicTestCase
+{
+ private PortalContainer container;
+ private ConfigurationManager configManager;
+
+ public void setUp() throws Exception
+ {
+ container = PortalContainer.getInstance();
+ configManager = (ConfigurationManager)container.getComponentInstanceOfType(ConfigurationManager.class);
+ }
+
+ public void testParameters()
+ {
+ InitParams params = null;
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ params = new InitParams();
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ params.addParameter(paramConf);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("fakePath");
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(RPCServiceImpl.DEFAULT_TIMEOUT, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramTimeout = new ValueParam();
+ paramTimeout.setName(RPCServiceImpl.PARAM_DEFAULT_TIMEOUT);
+ paramTimeout.setValue("fakeValue");
+ params.addParameter(paramTimeout);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a NumberFormatException since the timeout is not properly set");
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ paramTimeout.setValue("60");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramClusterName = new ValueParam();
+ paramClusterName.setName(RPCServiceImpl.PARAM_CLUSTER_NAME);
+ paramClusterName.setValue("MyName");
+ params.addParameter(paramClusterName);
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(paramClusterName.getValue() + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testStates() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ RemoteCommand foo = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "foo";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+
+ service.registerCommand(foo);
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ service.start();
+ service.executeCommandOnAllNodes(foo, true);
+ service.executeCommandOnAllNodes(foo, 10);
+ service.executeCommandOnCoordinator(foo, true);
+ service.executeCommandOnCoordinator(foo, 10);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ }
+
+ public void testCommands() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand fake = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2_Unregistered = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(fake2);
+ RemoteCommand Exception = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Exception";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ };
+ service.registerCommand(Exception);
+ RemoteCommand Error = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Error";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ } ;
+ service.registerCommand(Error);
+ RemoteCommand StringValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "StringValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service.registerCommand(StringValue);
+ RemoteCommand NullValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "NullValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(NullValue);
+ RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ };
+ service.registerCommand(LongTask);
+ service.start();
+ try
+ {
+ service.executeCommandOnAllNodes(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ List<Object> result;
+ result = service.executeCommandOnAllNodes(Exception, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Exception, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(Error, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Error, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertNull(o);
+ result = service.executeCommandOnAllNodes(LongTask, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(LongTask, 1000);
+ fail("We expect an RPCException due to a Replication Timeout");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, false);
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ assertNull(service.executeCommandOnCoordinator(LongTask, false));
+
+ result = service.executeCommandOnAllNodes(StringValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertEquals("OK", result.get(0));
+ o = service.executeCommandOnCoordinator(StringValue, true);
+ assertEquals("OK", o);
+ result = service.executeCommandOnAllNodes(NullValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ o = service.executeCommandOnCoordinator(NullValue, true);
+ assertNull(o);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSeveralNodes() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service1 = null, service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand CmdUnknownOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "CmdUnknownOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(CmdUnknownOnNode2);
+ RemoteCommand ExceptionOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ExceptionOnNode2);
+ RemoteCommand ErrorOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ErrorOnNode2);
+
+ RemoteCommand LongTaskOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(LongTaskOnNode2);
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(20000);
+ return "OldCoordinator";
+ }
+ });
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ });
+ RemoteCommand OK = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service2.registerCommand(OK);
+ final RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "NewCoordinator";
+ }
+ };
+ service2.registerCommand(LongTask);
+ service1.start();
+ service2.start();
+ List<Object> result;
+ Object o;
+ result = service1.executeCommandOnAllNodes(CmdUnknownOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command is unknown on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(CmdUnknownOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ExceptionOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ExceptionOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ErrorOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ErrorOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(LongTaskOnNode2, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
+ assertEquals("OK", o);
+
+ Vector<Address> allMembers = service1.members;
+ Vector<Address> coordinatorOnly = new Vector<Address>(1);
+ coordinatorOnly.add(service1.coordinator);
+
+ final RPCServiceImpl service = service2;
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ final CountDownLatch doneSignal = new CountDownLatch(1);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertEquals("NewCoordinator", o);
+ }
+ catch (Throwable e)
+ {
+ error.set(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ t.start();
+ service1.stop();
+ doneSignal.await();
+ assertNull(error.get() != null ? error.get().getMessage() : "", error.get());
+ result = service2.excecuteCommand(allMembers, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ assertEquals("OK", result.get(1));
+ result = service2.excecuteCommand(coordinatorOnly, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
+ public void testSingleMethodCallCommand() throws Exception
+ {
+ try
+ {
+ new SingleMethodCallCommand(null, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ MyService myService = new MyService();
+ try
+ {
+ new SingleMethodCallCommand(myService, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "foo");
+ fail("we expect an NoSuchMethodException");
+ }
+ catch (NoSuchMethodException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "getPrivateName");
+ fail("we expect an IllegalArgumentException since only the public methods are allowed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand getName = service.registerCommand(new SingleMethodCallCommand(myService, "getName"));
+ RemoteCommand add = service.registerCommand(new SingleMethodCallCommand(myService, "add", int.class));
+ RemoteCommand evaluate1 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", int[].class));
+ RemoteCommand evaluate2 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", List.class));
+ RemoteCommand total1 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class));
+ RemoteCommand total2 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class, int.class));
+ RemoteCommand total3 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int[].class));
+ RemoteCommand total4 = service.registerCommand(new SingleMethodCallCommand(myService, "total", String.class, long.class, int[].class));
+ RemoteCommand testTypes1 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", String[].class));
+ RemoteCommand testTypes2 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", int[].class));
+ RemoteCommand testTypes3 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", long[].class));
+ RemoteCommand testTypes4 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", byte[].class));
+ RemoteCommand testTypes5 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", short[].class));
+ RemoteCommand testTypes6 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", char[].class));
+ RemoteCommand testTypes7 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", double[].class));
+ RemoteCommand testTypes8 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", float[].class));
+ RemoteCommand testTypes9 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", boolean[].class));
+
+ service.start();
+ List<Object> result;
+
+ assertEquals("name", service.executeCommandOnCoordinator(getName, true));
+ result = service.executeCommandOnAllNodes(getName, true);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("name", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(add, true, 10));
+ result = service.executeCommandOnAllNodes(add, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate1, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(evaluate1, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ List<Integer> values = new ArrayList<Integer>();
+ values.add(10);
+ values.add(10);
+ values.add(10);
+ values.add(30);
+ values.add(40);
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate2, true, (Serializable)values));
+ result = service.executeCommandOnAllNodes(evaluate2, true, (Serializable)values);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(total1, true, 10));
+ result = service.executeCommandOnAllNodes(total1, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(20, service.executeCommandOnCoordinator(total2, true, 10, 10));
+ result = service.executeCommandOnAllNodes(total2, true, 10, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total3, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total3, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(0, service.executeCommandOnCoordinator(total4, true, "foo", 50, null));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, null);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(0, result.get(0));
+
+ try
+ {
+ service.executeCommandOnCoordinator(total4, true, "foo", 50);
+ fail("We expect a RPCException since the list of arguments mismatch with what is expected");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since the list of arguments mismatch with what is expected", result.get(0) instanceof RPCException);
+
+ assertEquals("foo", service.executeCommandOnCoordinator(testTypes1, true, (Serializable)new String[]{"foo"}));
+ result = service.executeCommandOnAllNodes(testTypes1, true, (Serializable)new String[]{"foo"});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("foo", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(testTypes2, true, new int[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes2, true, new int[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(11L, service.executeCommandOnCoordinator(testTypes3, true, new long[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes3, true, new long[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(11L, result.get(0));
+
+ assertEquals((byte)12, service.executeCommandOnCoordinator(testTypes4, true, new byte[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes4, true, new byte[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((byte)12, result.get(0));
+
+ assertEquals((short)13, service.executeCommandOnCoordinator(testTypes5, true, new short[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes5, true, new short[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((short)13, result.get(0));
+
+ assertEquals('a', service.executeCommandOnCoordinator(testTypes6, true, new char[]{'a'}));
+ result = service.executeCommandOnAllNodes(testTypes6, true, new char[]{'a'});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals('a', result.get(0));
+
+ assertEquals(10.5, service.executeCommandOnCoordinator(testTypes7, true, new double[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes7, true, new double[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10.5, result.get(0));
+
+ assertEquals((float)11.5, service.executeCommandOnCoordinator(testTypes8, true, new float[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes8, true, new float[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((float)11.5, result.get(0));
+
+ assertEquals(true, service.executeCommandOnCoordinator(testTypes9, true, new boolean[]{true}));
+ result = service.executeCommandOnAllNodes(testTypes9, true, new boolean[]{true});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(true, result.get(0));
+
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public static class MyService
+ {
+ private int value = 0;
+
+ public int add(int i)
+ {
+ return value += i;
+ }
+
+ @SuppressWarnings("unused")
+ private String getPrivateName()
+ {
+ return "name";
+ }
+
+ public String getName()
+ {
+ return "name";
+ }
+
+ public int total(int i)
+ {
+ return i;
+ }
+
+ public int total(int i1, int i2)
+ {
+ return i1 + i2;
+ }
+
+ public int total(int... values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int total(String s, long l, int... values)
+ {
+ int total = 0;
+ if (values != null)
+ {
+ for (int i : values)
+ {
+ total += i;
+ }
+ }
+ return total;
+ }
+
+ public int evaluate(int[] values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int evaluate(List<Integer> values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public String testTypes(String... values)
+ {
+ return values[0];
+ }
+
+ public boolean testTypes(boolean... values)
+ {
+ return values[0];
+ }
+
+ public char testTypes(char... values)
+ {
+ return values[0];
+ }
+
+ public double testTypes(double... values)
+ {
+ return values[0] + 0.5;
+ }
+
+ public float testTypes(float... values)
+ {
+ return (float)(values[0] + 1.5);
+ }
+
+ public int testTypes(int... values)
+ {
+ return values[0];
+ }
+
+ public long testTypes(long... values)
+ {
+ return values[0] + 1;
+ }
+
+ public byte testTypes(byte... values)
+ {
+ return (byte)(values[0] + 2);
+ }
+
+ public short testTypes(short... values)
+ {
+ return (short)(values[0] + 3);
+ }
+ }
+}
Added: kernel/trunk/exo.kernel.component.common/src/test/resources/conf/portal/udp.xml
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/resources/conf/portal/udp.xml (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/test/resources/conf/portal/udp.xml 2010-10-06 14:10:44 UTC (rev 3252)
@@ -0,0 +1,64 @@
+<config>
+ <UDP
+ mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
+ mcast_port="${jgroups.udp.mcast_port:45588}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+ enable_diagnostics="true"
+ thread_naming_pattern="cl"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="2"
+ thread_pool.max_threads="8"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="true"
+ thread_pool.queue_max_size="1000"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run" />
+
+ <PING timeout="2000"
+ num_initial_members="3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK />
+ <FD timeout="10000" max_tries="5" shun="true" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK use_stats_for_retransmission="false"
+ exponential_backoff="150"
+ use_mcast_xmit="true" gc_lag="0"
+ retransmit_timeout="50,300,600,1200"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="1000000"/>
+ <VIEW_SYNC avg_send_interval="60000" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="false"
+ view_bundling="true"/>
+ <FC max_credits="500000"
+ min_threshold="0.20"/>
+ <FRAG2 frag_size="60000" />
+ <!--pbcast.STREAMING_STATE_TRANSFER /-->
+ <pbcast.STATE_TRANSFER />
+ <!-- pbcast.FLUSH /-->
+</config>
More information about the exo-jcr-commits
mailing list