exo-jcr SVN: r5272 - jcr/trunk/exo.jcr.component.ftp/src/main/java/org/exoplatform/services/ftp/command.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-07 02:29:23 -0500 (Wed, 07 Dec 2011)
New Revision: 5272
Modified:
jcr/trunk/exo.jcr.component.ftp/src/main/java/org/exoplatform/services/ftp/command/CmdRetr.java
Log:
EXOJCR-848: IndexOutOfBoundsException on server side while trying to use FTP through browser
Modified: jcr/trunk/exo.jcr.component.ftp/src/main/java/org/exoplatform/services/ftp/command/CmdRetr.java
===================================================================
--- jcr/trunk/exo.jcr.component.ftp/src/main/java/org/exoplatform/services/ftp/command/CmdRetr.java 2011-12-06 13:40:35 UTC (rev 5271)
+++ jcr/trunk/exo.jcr.component.ftp/src/main/java/org/exoplatform/services/ftp/command/CmdRetr.java 2011-12-07 07:29:23 UTC (rev 5272)
@@ -154,13 +154,20 @@
ArrayList<String> newPath = clientSession().getFullPath(resName);
try
{
- String repoPath = clientSession().getRepoPath(newPath);
- Session curSession = clientSession().getSession(newPath.get(0));
+ if (!newPath.isEmpty())
+ {
+ String repoPath = clientSession().getRepoPath(newPath);
+ Session curSession = clientSession().getSession(newPath.get(0));
- Node parentNode = (Node)curSession.getItem(repoPath);
- if (parentNode.isNodeType(FtpConst.NodeTypes.NT_FILE))
+ Node parentNode = (Node)curSession.getItem(repoPath);
+ if (parentNode.isNodeType(FtpConst.NodeTypes.NT_FILE))
+ {
+ return true;
+ }
+ }
+ else
{
- return true;
+ return false;
}
}
catch (PathNotFoundException exc)
12 years, 5 months
exo-jcr SVN: r5271 - jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel.
by do-not-reply@jboss.org
Author: nfilotto
Date: 2011-12-06 08:40:35 -0500 (Tue, 06 Dec 2011)
New Revision: 5271
Modified:
jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml
Log:
EXOJCR-1672: Propose an RPCService implementation based on JGroups 3 (doc)
Modified: jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml
===================================================================
--- jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml 2011-12-06 13:40:17 UTC (rev 5270)
+++ jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/kernel/rpc-service.xml 2011-12-06 13:40:35 UTC (rev 5271)
@@ -128,8 +128,9 @@
<title>Configuration</title>
<para>The configuration of the <emphasis>RPCService</emphasis> should be
- added only in a cluster environment. See below an example of
- configuration</para>
+ added only in a cluster environment. See below an example of configuration
+ in case you intend to use JGroups 2 (which is mandatory if you use JBoss
+ Cache as underlying cache):</para>
<programlisting language="xml"><configuration>
....
@@ -162,6 +163,44 @@
...
</configuration></programlisting>
+ <para>See below an example of configuration in case you intend to use
+ JGroups 3 (which is mandatory if you use Infinispan as underlying
+ cache):</para>
+
+ <programlisting language="xml"><configuration>
+....
+ <component>
+ <key>org.exoplatform.services.rpc.RPCService</key>
+ <type>org.exoplatform.services.rpc.jgv3.RPCServiceImpl</type>
+ <init-params>
+ <value-param>
+ <name>jgroups-configuration</name>
+ <value>classpath:/udp.xml</value>
+ </value-param>
+ <value-param>
+ <name>jgroups-cluster-name</name>
+ <value>RPCService-Cluster</value>
+ </value-param>
+ <value-param>
+ <name>jgroups-default-timeout</name>
+ <value>0</value>
+ </value-param>
+ <value-param>
+ <name>allow-failover</name>
+ <value>true</value>
+ </value-param>
+ <value-param>
+ <name>retry-timeout</name>
+ <value>20000</value>
+ </value-param>
+ </init-params>
+ </component>
+...
+</configuration></programlisting>
+
+ <para>The implementation for JGroups 3 is available in the library
+ <emphasis>exo.kernel.component.ext.rpc.impl.jgroups.v3-X.Y.Z.jar</emphasis>.</para>
+
<table>
<title>Fields description</title>
12 years, 5 months
exo-jcr SVN: r5270 - in kernel/trunk: exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl and 20 other directories.
by do-not-reply@jboss.org
Author: nfilotto
Date: 2011-12-06 08:40:17 -0500 (Tue, 06 Dec 2011)
New Revision: 5270
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
kernel/trunk/pom.xml
Log:
EXOJCR-1672: Propose an RPCService implementation based on JGroups 3 (impl)
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/AbstractRPCService.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1032 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.commons.utils.PropertyManager;
+import org.exoplatform.commons.utils.SecurityHelper;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RPCService;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.picocontainer.Startable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
+ * {@link MessageDispatcher} of JGroups. This implementation is not designed to give
+ * the best possible performances, it only aims to give a way to communicate with other nodes.
+ *
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public abstract class AbstractRPCService implements RPCService, Startable, RequestHandler, MembershipListener
+{
+
+ /**
+ * Connection logger.
+ */
+ private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
+
+ /**
+ * The name of the parameter for the location of the JGroups configuration.
+ */
+ protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
+
+ /**
+ * The name of the parameter for the name of the cluster.
+ */
+ protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
+
+ /**
+ * The name of the parameter for the default timeout
+ */
+ protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
+
+ /**
+ * The name of the parameter to allow the failover
+ */
+ protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
+
+ /**
+ * The name of the parameter for the retry timeout
+ */
+ protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
+
+ /**
+ * The value of the default timeout
+ */
+ protected static final int DEFAULT_TIMEOUT = 0;
+
+ /**
+ * The value of the default retry timeout
+ */
+ protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
+
+ /**
+ * The default value of the cluster name
+ */
+ protected static final String CLUSTER_NAME = "RPCService-Cluster";
+
+ /**
+ * The configurator used to create the JGroups Channel
+ */
+ protected final ProtocolStackConfigurator configurator;
+
+ /**
+ * The lock used to synchronize all the threads waiting for a topology change.
+ */
+ private final Object topologyChangeLock = new Object();
+
+ /**
+ * The name of the cluster
+ */
+ private final String clusterName;
+
+ /**
+ * The JGroups Channel used to communicate with other nodes
+ */
+ protected Channel channel;
+
+ /**
+ * The current list of all the members of the cluster
+ */
+ protected volatile List<Address> members;
+
+ /**
+ * The address of the current coordinator
+ */
+ protected volatile Address coordinator;
+
+ /**
+ * Indicates whether the current node is the coordinator of the cluster or not
+ */
+ protected volatile boolean isCoordinator;
+
+ /**
+ * The default value of the timeout
+ */
+ private long defaultTimeout = DEFAULT_TIMEOUT;
+
+ /**
+ * The value of the retry timeout
+ */
+ private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
+
+ /**
+ * Indicates whether the failover capabilities are enabled
+ */
+ private boolean allowFailover = true;
+
+ /**
+ * The dispatcher used to launch the command of the cluster nodes
+ */
+ protected MessageDispatcher dispatcher;
+
+ /**
+ * The signal that indicates that the service is started, it will be used
+ * to make the application wait until the service is fully started to
+ * ensure that all the commands have been registered before handling
+ * incoming messages.
+ */
+ private final CountDownLatch startSignal = new CountDownLatch(1);
+
+ /**
+ * All the registered {@link TopologyChangeListener}
+ */
+ private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
+
+ /**
+ * Current State of the {@link RPCServiceImpl}
+ */
+ private volatile State state;
+
+ /**
+ * All the commands that have been registered
+ */
+ private volatile Map<String, RemoteCommand> commands =
+ Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
+
+ /**
+ * The public constructor
+ * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
+ * {@link ExoContainer}
+ * @param params the list of initial parameters
+ * @param configManager the configuration manager used to get the configuration
+ * of JGroups
+ */
+ public AbstractRPCService(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+ {
+ if (params == null)
+ {
+ throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
+ }
+ final URL properties = getProperties(params, configManager);
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
+ }
+
+ try
+ {
+ this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
+ {
+ public ProtocolStackConfigurator run() throws Exception
+ {
+ return ConfiguratorFactory.getStackConfigurator(properties);
+ }
+ });
+ }
+ catch (PrivilegedActionException pae)
+ {
+ throw new RuntimeException("Cannot load the JGroups configuration from " + properties, pae.getCause());
+ }
+
+ this.clusterName = getClusterName(ctx, params);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
+ }
+ String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
+ if (sTimeout != null)
+ {
+ defaultTimeout = Integer.parseInt(sTimeout);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
+ }
+ }
+ String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
+ if (sAllowFailover != null)
+ {
+ allowFailover = Boolean.valueOf(sAllowFailover);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
+ }
+ }
+ sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
+ if (sTimeout != null)
+ {
+ retryTimeout = Integer.parseInt(sTimeout);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
+ }
+ }
+ this.state = State.INITIALIZED;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnAllNodesMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on each cluster node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
+ * the RPCException will be the corresponding response of this particular node
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ return excecuteCommand(members, command, synchronous, timeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
+ throws RPCException
+ {
+ return executeCommandOnCoordinatorMain(command, true, timeout, args);
+ }
+
+ /**
+ * Executes a command on the coordinator only. This method is equivalent to the other method of the
+ * same type but with the default timeout. The command must be registered first otherwise an
+ * {@link RPCException} will be thrown.
+ *
+ * @param command The command to execute on the coordinator node
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
+ * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args an array of {@link Serializable} objects corresponding to parameters of the command
+ * to execute remotely
+ * @return the response of the coordinator.
+ * @throws RPCException in the event of problems.
+ */
+ protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
+ Serializable... args) throws RPCException
+ {
+ Address coordinator = this.coordinator;
+ Vector<Address> v = new Vector<Address>(1);
+ v.add(coordinator);
+ List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
+ Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
+ if (allowFailover && result instanceof MemberHasLeftException)
+ {
+ // The failover capabilities have been enabled and the coordinator seems to have left
+ if (coordinator.equals(this.coordinator))
+ {
+ synchronized(topologyChangeLock)
+ {
+ if (coordinator.equals(this.coordinator))
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("The coordinator did not change yet, we will relaunch the command after "
+ + retryTimeout + " ms or once a topology change has been detected");
+ try
+ {
+ topologyChangeLock.wait(retryTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ if (LOG.isTraceEnabled())
+ LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");
+ return executeCommandOnCoordinator(command, synchronous, timeout, args);
+ }
+ else if (result instanceof RPCException)
+ {
+ throw (RPCException)result;
+ }
+ return result;
+ }
+
+ /**
+ * Execute the command on all the nodes corresponding to the list of destinations.
+ * @param dests the list of members on which the command needs to be executed
+ * @param command the command to execute
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets
+ * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param args the list of parameters
+ * @return a list of responses from all the targeted members of the cluster.
+ * @throws RPCException in the event of problems.
+ */
+ protected List<Object> excecuteCommand(final List<Address> dests, RemoteCommand command,
+ final boolean synchronous, final long timeout, Serializable... args) throws RPCException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (state != State.STARTED)
+ {
+ throw new RPCException(
+ "Cannot execute any commands if the service is not started, the current state of the service is " + state);
+ }
+ final String commandId = command.getId();
+ if (commands.get(commandId) != command)
+ {
+ throw new RPCException("Command " + commandId + " unknown, please register your command first");
+ }
+ final Message msg = new Message();
+ setObject(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
+ RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
+ {
+ public RspList run()
+ {
+ try
+ {
+ return castMessage(dests, msg, synchronous, timeout);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Could not cast the message corresponding to the command " + commandId + ".", e);
+ }
+ return null;
+ }
+ });
+
+ if (LOG.isTraceEnabled())
+ LOG.trace("responses: " + rsps);
+ if (rsps == null)
+ throw new RPCException("Could not get the responses for command " + commandId + ".");
+ if (!synchronous)
+ return Collections.emptyList();// async case
+ if (LOG.isTraceEnabled())
+ {
+ LOG.trace("(" + getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
+ }
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+ for (Address dest : dests)
+ {
+ Rsp rsp = rsps.get(dest);
+ if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
+ {
+ // The corresponding member has left
+ retval.add(new MemberHasLeftException("No response for the member " + dest
+ + ", this member has probably left the cluster."));
+ }
+ else if (!rsp.wasReceived())
+ {
+ retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
+ }
+ else
+ {
+ Object value = rsp.getValue();
+ if (value instanceof RPCException)
+ {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (LOG.isTraceEnabled())
+ LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
+ }
+ retval.add(value);
+ }
+ }
+ return retval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object handle(Message msg)
+ {
+ String commandId = null;
+ try
+ {
+ // Ensure that the service is fully started before trying to execute any command
+ startSignal.await();
+ MessageBody body = (MessageBody)msg.getObject();
+ commandId = body.getCommandId();
+ if (!body.accept(getLocalAddress()))
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
+ "only and the local node is not the coordinator, the command will be ignored");
+ return null;
+ }
+ RemoteCommand command = getCommand(commandId);
+ if (command == null)
+ {
+ return new RPCException("Command " + commandId + " unkown, please register your command first");
+ }
+ Object execResult = command.execute(body.getArgs());
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
+ return execResult;
+ }
+ catch (Throwable x)
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Problems invoking command.", x);
+ return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void block()
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void suspect(Address suspectedMbr)
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void viewAccepted(View view)
+ {
+ boolean coordinatorHasChanged;
+ synchronized (topologyChangeLock)
+ {
+ this.members = getMembers(view);
+ Address currentCoordinator = coordinator;
+ this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
+ this.isCoordinator = coordinator != null && coordinator.equals(getLocalAddress());
+ coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
+ // Release all the nodes
+ topologyChangeLock.notifyAll();
+ }
+ onTopologyChange(coordinatorHasChanged);
+ }
+
+ /**
+ * Called anytime the topology has changed, this method will notify all the listeners
+ * currently registered
+ * @param coordinatorHasChanged this parameter is set to <code>true</code> if the
+ * coordinator has changed, <code>false</code> otherwise
+ */
+ private void onTopologyChange(boolean coordinatorHasChanged)
+ {
+ TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
+ for (TopologyChangeListener listener : listeners)
+ {
+ try
+ {
+ listener.onChange(event);
+ }
+ catch (Exception e)
+ {
+ LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized RemoteCommand registerCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ RemoteCommand oldCommand = tmpCommands.put(commandId, command);
+ if (oldCommand != null && PropertyManager.isDevelopping())
+ {
+ LOG.warn("A command has already been registered with the id " + commandId
+ + ", this command will be replaced with the new one");
+ }
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ return command;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public synchronized void unregisterCommand(RemoteCommand command)
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (command != null)
+ {
+ String commandId = command.getId();
+ if (commandId == null)
+ {
+ throw new IllegalArgumentException("The command Id cannot be null");
+ }
+ if (commands.get(commandId) != command)
+ {
+ // We prevent to remove any command that has not been registered, thus we expect that
+ // the registered instance is exactly the same instance as the one that we want to
+ // unregister
+ if (PropertyManager.isDevelopping())
+ {
+ LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
+ + " is unknown or the instance of RemoteCommand to unregister is unknown");
+ }
+ return;
+ }
+ Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
+ tmpCommands.remove(commandId);
+ this.commands = Collections.unmodifiableMap(tmpCommands);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isCoordinator() throws RPCException
+ {
+ if (state != State.STARTED)
+ {
+ throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
+ "the service is not started, the current state of the service is " + state);
+ }
+ return isCoordinator;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (listener == null)
+ {
+ return;
+ }
+ listeners.add(listener);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+ if (listener == null)
+ {
+ return;
+ }
+ listeners.remove(listener);
+ }
+
+ /**
+ * Gives the {@link RemoteCommand} corresponding to the given id
+ * @param commandId the command id of the command to retrieve
+ * @return the corresponding {@link RemoteCommand}
+ */
+ protected RemoteCommand getCommand(String commandId)
+ {
+ return commands.get(commandId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void start()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+
+ try
+ {
+ SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
+ {
+ public Void run() throws Exception
+ {
+ channel = createChannel();
+ dispatcher = new MessageDispatcher(channel, null, AbstractRPCService.this, AbstractRPCService.this);
+ channel.connect(clusterName);
+ return null;
+ }
+ });
+ }
+ catch (PrivilegedActionException pae)
+ {
+ throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", pae.getCause());
+ }
+ finally
+ {
+ this.state = State.STARTED;
+ startSignal.countDown();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void stop()
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
+ }
+
+ this.state = State.STOPPED;
+ this.isCoordinator = false;
+ if (channel != null && channel.isOpen())
+ {
+ if (LOG.isInfoEnabled())
+ LOG.info("Disconnecting and closing the Channel");
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+ public Void run()
+ {
+ channel.disconnect();
+ channel.close();
+ return null;
+ }
+ });
+ channel = null;
+ }
+ if (dispatcher != null)
+ {
+ dispatcher.stop();
+ dispatcher = null;
+ }
+ }
+
+ /**
+ * Gives the value of the default timeout
+ * @return the default timeout
+ */
+ protected long getDefaultTimeout()
+ {
+ return defaultTimeout;
+ }
+
+ /**
+ * Gives the name of the cluster
+ * @return the name of the cluster
+ */
+ protected String getClusterName()
+ {
+ return clusterName;
+ }
+
+ /**
+ * Gives the value of the retry timeout
+ * @return the value of the retry timeout
+ */
+ protected long getRetryTimeout()
+ {
+ return retryTimeout;
+ }
+
+ /**
+ * Indicates whether the failover capabilities are enabled or not
+ * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
+ * otherwise
+ */
+ protected boolean isAllowFailover()
+ {
+ return allowFailover;
+ }
+
+ /**
+ * Returns the channel's own address. The result of calling this method on an unconnected
+ * channel is implementation defined (may return null). Calling this method on a closed
+ * channel returns null. Successor to {@link #getAddress()}. Addresses can be used as destination
+ * in the <code>send()</code> operation.
+ * @return The channel's address (opaque) or null if it cannot be found
+ */
+ protected abstract Address getLocalAddress();
+
+ /**
+ * Cast a message to all the given members
+ * @param dests The members to which the message is to be sent.
+ * @param msg The message to be sent to the members.
+ * @param synchronous Indicates whether the message must be sent in synchronous or asynchronous mode.
+ * @param timeout If 0: wait forever. Otherwise, wait for responses or timeout time.
+ * @return A list of responses. Each response is an <code>Object</code> and associated to its sender.
+ * @throws Exception if any error occur while casting the message
+ */
+ protected abstract RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception;
+
+ /**
+ * Create a channel
+ * @return An initialized channel
+ * @throws Exception if any error occur while creating the channel
+ */
+ protected abstract Channel createChannel() throws Exception;
+
+ /**
+ * Returns a reference to the List of members (ordered)
+ * Do NOT change this list, hence your will invalidate the view
+ * Make a copy if you have to modify it.
+ *
+ * @return a reference to the ordered list of members in this view
+ */
+ protected abstract List<Address> getMembers(View view);
+
+ /**
+ * Takes an object and uses Java serialization to generate the byte[] buffer which
+ * is set in the message.
+ */
+ protected abstract void setObject(Message m, Object o);
+
+ /**
+ * Gives the value of the {@link ValueParam} corresponding to the given key
+ * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
+ * @param parameterKey the name of the {@link ValueParam} that we are looking for
+ * @return the value if it exists, null otherwise
+ */
+ private static String getValueParam(InitParams params, String parameterKey)
+ {
+ try
+ {
+ return params.getValueParam(parameterKey).getValue().trim();
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gives the {@link URL} corresponding to the location of the JGroups configuration
+ * @param params the initial parameters from which we extract the parameter
+ * <code>PARAM_JGROUPS_CONFIG</code>
+ * @param configManager the configuration manager used to get the {@link URL} corresponding
+ * to the path given in the configuration of the RPCServiceImpl
+ * @return The {@link URL} corresponding to the location of the JGroups configuration,
+ * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
+ */
+ private static URL getProperties(InitParams params, ConfigurationManager configManager)
+ {
+ String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
+ if (configPath == null)
+ {
+ throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
+ + "' of RPCServiceImpl is mandatory");
+ }
+ URL properties;
+ try
+ {
+ properties = configManager.getResource(configPath);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
+ }
+ if (properties == null)
+ {
+ throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
+ }
+ return properties;
+ }
+
+ /**
+ * Gives the name of the cluster that will be able to support several portal containers
+ * since the name will be post fixed with "-${container-name}"
+ * @param ctx the context from which we extract the name of the container
+ * @param params the list of initial parameters from which we get the value of the parameter
+ * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
+ */
+ private static String getClusterName(ExoContainerContext ctx, InitParams params)
+ {
+ String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
+ if (clusterName == null)
+ {
+ clusterName = CLUSTER_NAME;
+ }
+ return clusterName += "-" + ctx.getName();
+ }
+
+ /**
+ * This intern class will be used to
+ */
+ public static class MessageBody implements Externalizable
+ {
+ /**
+ * The Id of the command to execute
+ */
+ private String commandId;
+
+ /**
+ * The list of parameters
+ */
+ private Serializable[] args;
+
+ /**
+ * The hash code of the expected destination
+ */
+ private int destination;
+
+ public MessageBody()
+ {
+ }
+
+ /**
+ * @param dest The destination of the message
+ * @param commandId the id of the command to execute
+ * @param args the arguments to use
+ */
+ public MessageBody(Address dest, String commandId, Serializable[] args)
+ {
+ this.commandId = commandId;
+ this.args = args;
+ this.destination = dest == null ? 0 : dest.hashCode();
+ }
+
+ public String getCommandId()
+ {
+ return commandId;
+ }
+
+ public Serializable[] getArgs()
+ {
+ return args;
+ }
+
+ /**
+ * Indicates whether or not the given message body accepts the given address
+ * @param address the address to check
+ * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
+ * <code>false</code> otherwise
+ */
+ public boolean accept(Address address)
+ {
+ return destination == 0 || destination == address.hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ boolean unicast = in.readBoolean();
+ if (unicast)
+ {
+ this.destination = in.readInt();
+ }
+ this.commandId = in.readUTF();
+ int size = in.readInt();
+ if (size == -1)
+ {
+ this.args = null;
+ }
+ else
+ {
+ this.args = new Serializable[size];
+ for (int i = 0; i < size; i++)
+ {
+ args[i] = (Serializable)in.readObject();
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ boolean unicast = destination != 0;
+ out.writeBoolean(unicast);
+ if (unicast)
+ {
+ out.writeInt(destination);
+ }
+ out.writeUTF(commandId);
+ if (args == null)
+ {
+ out.writeInt(-1);
+ }
+ else
+ {
+ out.writeInt(args.length);
+ for (int i = 0; i < args.length; i++)
+ {
+ out.writeObject(args[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * All the potential states of the {@link RPCServiceImpl}
+ */
+ public enum State
+ {
+ INITIALIZED, STARTED, STOPPED
+ }
+
+ public static class MemberHasLeftException extends RPCException
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 3558158913564367637L;
+
+ public MemberHasLeftException(String message)
+ {
+ super(message);
+ }
+ }
+}
Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -18,1036 +18,78 @@
*/
package org.exoplatform.services.rpc.impl;
-import org.exoplatform.commons.utils.PropertyManager;
-import org.exoplatform.commons.utils.SecurityHelper;
-import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.configuration.ConfigurationManager;
import org.exoplatform.container.xml.InitParams;
-import org.exoplatform.container.xml.ValueParam;
-import org.exoplatform.services.log.ExoLogger;
-import org.exoplatform.services.log.Log;
-import org.exoplatform.services.rpc.RPCException;
-import org.exoplatform.services.rpc.RPCService;
-import org.exoplatform.services.rpc.RemoteCommand;
-import org.exoplatform.services.rpc.TopologyChangeEvent;
-import org.exoplatform.services.rpc.TopologyChangeListener;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.ChannelException;
import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.MessageDispatcher;
-import org.jgroups.blocks.RequestHandler;
-import org.jgroups.conf.ConfiguratorFactory;
-import org.jgroups.conf.ProtocolStackConfigurator;
-import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.picocontainer.Startable;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Vector;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
/**
- * This class is a basic implementation of the {@link RPCService}, it is mainly based on the
- * {@link MessageDispatcher}. This implementation is not designed to give the best possible
- * performances, it only aims to give a way to communicate with other nodes.
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 2.
*
* @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas Filotto</a>
* @version $Id$
*/
-public class RPCServiceImpl implements RPCService, Startable, RequestHandler, MembershipListener
+public class RPCServiceImpl extends AbstractRPCService
{
/**
- * Connection logger.
+ * {@inheritDoc}
*/
- private static final Log LOG = ExoLogger.getLogger("exo.kernel.component.common.RPCServiceImpl");
-
- /**
- * We use reflection for the Message.setObject method in order to remain backward compatible
- * because since JGroups 2.12 the signature has changed the expected parameter is no more a Serializable,
- * it is an Object
- */
- private static Method MESSAGE_SET_OBJECT_METHOD;
-
- static
- {
- try
- {
- MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Serializable.class);
- }
- catch (SecurityException e)
- {
- throw e;
- }
- catch (NoSuchMethodException e)
- {
- // We assume that we use JGroups 2.12 or higher
- try
- {
- MESSAGE_SET_OBJECT_METHOD = Message.class.getMethod("setObject", Object.class);
- }
- catch (SecurityException e1)
- {
- throw e1;
- }
- catch (Exception e1)
- {
- throw new RuntimeException("Could not find the right Message.setObject method", e);
- }
- }
- }
-
- /**
- * The name of the parameter for the location of the JGroups configuration.
- */
- protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
-
- /**
- * The name of the parameter for the name of the cluster.
- */
- protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
-
- /**
- * The name of the parameter for the default timeout
- */
- protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
-
- /**
- * The name of the parameter to allow the failover
- */
- protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
-
- /**
- * The name of the parameter for the retry timeout
- */
- protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
-
- /**
- * The value of the default timeout
- */
- protected static final int DEFAULT_TIMEOUT = 0;
-
- /**
- * The value of the default retry timeout
- */
- protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
-
- /**
- * The default value of the cluster name
- */
- protected static final String CLUSTER_NAME = "RPCService-Cluster";
-
- /**
- * The configurator used to create the JGroups Channel
- */
- private final ProtocolStackConfigurator configurator;
-
- /**
- * The lock used to synchronize all the threads waiting for a topology change.
- */
- private final Object topologyChangeLock = new Object();
-
- /**
- * The name of the cluster
- */
- private final String clusterName;
-
- /**
- * The JGroups Channel used to communicate with other nodes
- */
- protected Channel channel;
-
- /**
- * The current list of all the members of the cluster
- */
- protected volatile Vector<Address> members;
-
- /**
- * The address of the current coordinator
- */
- protected volatile Address coordinator;
-
- /**
- * Indicates whether the current node is the coordinator of the cluster or not
- */
- protected volatile boolean isCoordinator;
-
- /**
- * The default value of the timeout
- */
- private long defaultTimeout = DEFAULT_TIMEOUT;
-
- /**
- * The value of the retry timeout
- */
- private long retryTimeout = DEFAULT_RETRY_TIMEOUT;
-
- /**
- * Indicates whether the failover capabilities are enabled
- */
- private boolean allowFailover = true;
-
- /**
- * The dispatcher used to launch the command of the cluster nodes
- */
- private MessageDispatcher dispatcher;
-
- /**
- * The signal that indicates that the service is started, it will be used
- * to make the application wait until the service is fully started to
- * ensure that all the commands have been registered before handling
- * incoming messages.
- */
- private final CountDownLatch startSignal = new CountDownLatch(1);
-
- /**
- * All the registered {@link TopologyChangeListener}
- */
- private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
-
- /**
- * Current State of the {@link RPCServiceImpl}
- */
- private volatile State state;
-
- /**
- * All the commands that have been registered
- */
- private volatile Map<String, RemoteCommand> commands =
- Collections.unmodifiableMap(new HashMap<String, RemoteCommand>());
-
- /**
- * The public constructor
- * @param ctx the {@link ExoContainerContext} from which we will extract the corresponding
- * {@link ExoContainer}
- * @param params the list of initial parameters
- * @param configManager the configuration manager used to get the configuration
- * of JGroups
- */
public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
{
- if (params == null)
- {
- throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
- }
- final URL properties = getProperties(params, configManager);
- if (LOG.isInfoEnabled())
- {
- LOG.info("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties);
- }
-
- try
- {
- this.configurator = SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<ProtocolStackConfigurator>()
- {
- public ProtocolStackConfigurator run() throws Exception
- {
- return ConfiguratorFactory.getStackConfigurator(properties);
- }
- });
- }
- catch (PrivilegedActionException pae)
- {
- Throwable cause = pae.getCause();
- if (cause instanceof ChannelException)
- {
- throw new RuntimeException("Cannot load the JGroups configuration from " + properties, cause);
- }
- else if (cause instanceof RuntimeException)
- {
- throw (RuntimeException)cause;
- }
- else
- {
- throw new RuntimeException(cause);
- }
- }
-
- this.clusterName = getClusterName(ctx, params);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The cluster name of the RPCServiceImpl has been set to " + clusterName);
- }
- String sTimeout = getValueParam(params, PARAM_DEFAULT_TIMEOUT);
- if (sTimeout != null)
- {
- defaultTimeout = Integer.parseInt(sTimeout);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The default timeout of the RPCServiceImpl has been set to " + defaultTimeout);
- }
- }
- String sAllowFailover = getValueParam(params, PARAM_ALLOW_FAILOVER);
- if (sAllowFailover != null)
- {
- allowFailover = Boolean.valueOf(sAllowFailover);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The parameter '" + PARAM_ALLOW_FAILOVER + "' of the RPCServiceImpl has been set to " + allowFailover);
- }
- }
- sTimeout = getValueParam(params, PARAM_RETRY_TIMEOUT);
- if (sTimeout != null)
- {
- retryTimeout = Integer.parseInt(sTimeout);
- if (LOG.isDebugEnabled())
- {
- LOG.debug("The retry timeout of the RPCServiceImpl has been set to " + retryTimeout);
- }
- }
- this.state = State.INITIALIZED;
+ super(ctx, params, configManager);
}
/**
* {@inheritDoc}
*/
- public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable... args)
- throws RPCException
+ protected Address getLocalAddress()
{
- return executeCommandOnAllNodesMain(command, synchronous, defaultTimeout, args);
+ return channel.getLocalAddress();
}
-
+
/**
* {@inheritDoc}
*/
- public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable... args)
- throws RPCException
+ protected RspList castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout)
{
- return executeCommandOnAllNodesMain(command, true, timeout, args);
+ return dispatcher.castMessage(dests instanceof Vector ? (Vector<Address>)dests : new Vector<Address>(dests), msg,
+ synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, timeout);
}
-
+
/**
- * Executes a command on all the cluster nodes. This method is equivalent to the other method of the
- * same type but with the default timeout. The command must be registered first otherwise an
- * {@link RPCException} will be thrown.
- *
- * @param command The command to execute on each cluster node
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
- * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args an array of {@link Serializable} objects corresponding to parameters of the command
- * to execute remotely
- * @return a list of responses from all the members of the cluster. If we met an exception on a given node,
- * the RPCException will be the corresponding response of this particular node
- * @throws RPCException in the event of problems.
- */
- protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout,
- Serializable... args) throws RPCException
- {
- return excecuteCommand(members, command, synchronous, timeout, args);
- }
-
- /**
* {@inheritDoc}
*/
- public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable... args)
- throws RPCException
+ protected Channel createChannel() throws Exception
{
- return executeCommandOnCoordinatorMain(command, synchronous, defaultTimeout, args);
+ Channel channel = new JChannel(configurator);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ return channel;
}
/**
* {@inheritDoc}
*/
- public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable... args)
- throws RPCException
+ protected List<Address> getMembers(View view)
{
- return executeCommandOnCoordinatorMain(command, true, timeout, args);
+ return view.getMembers();
}
/**
- * Executes a command on the coordinator only. This method is equivalent to the other method of the
- * same type but with the default timeout. The command must be registered first otherwise an
- * {@link RPCException} will be thrown.
- *
- * @param command The command to execute on the coordinator node
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL},
- * and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args an array of {@link Serializable} objects corresponding to parameters of the command
- * to execute remotely
- * @return the response of the coordinator.
- * @throws RPCException in the event of problems.
- */
- protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout,
- Serializable... args) throws RPCException
- {
- Address coordinator = this.coordinator;
- Vector<Address> v = new Vector<Address>(1);
- v.add(coordinator);
- List<Object> lResults = excecuteCommand(v, command, synchronous, timeout, args);
- Object result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
- if (allowFailover && result instanceof MemberHasLeftException)
- {
- // The failover capabilities have been enabled and the coordinator seems to have left
- if (coordinator.equals(this.coordinator))
- {
- synchronized(topologyChangeLock)
- {
- if (coordinator.equals(this.coordinator))
- {
- if (LOG.isTraceEnabled())
- LOG.trace("The coordinator did not change yet, we will relaunch the command after "
- + retryTimeout + " ms or once a topology change has been detected");
- try
- {
- topologyChangeLock.wait(retryTimeout);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- if (LOG.isTraceEnabled())
- LOG.trace("The coordinator has changed, we will automatically retry with the new coordinator");
- return executeCommandOnCoordinator(command, synchronous, timeout, args);
- }
- else if (result instanceof RPCException)
- {
- throw (RPCException)result;
- }
- return result;
- }
-
- /**
- * Execute the command on all the nodes corresponding to the list of destinations.
- * @param dests the list of members on which the command needs to be executed
- * @param command the command to execute
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets
- * it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param args the list of parameters
- * @return a list of responses from all the targeted members of the cluster.
- * @throws RPCException in the event of problems.
- */
- protected List<Object> excecuteCommand(final Vector<Address> dests, RemoteCommand command,
- final boolean synchronous, final long timeout, Serializable... args) throws RPCException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (state != State.STARTED)
- {
- throw new RPCException(
- "Cannot execute any commands if the service is not started, the current state of the service is " + state);
- }
- String commandId = command.getId();
- if (commands.get(commandId) != command)
- {
- throw new RPCException("Command " + commandId + " unknown, please register your command first");
- }
- final Message msg = new Message();
- try
- {
- MESSAGE_SET_OBJECT_METHOD.invoke(msg, new MessageBody(dests.size() == 1 && dests != members ? dests.get(0) : null, commandId, args));
- }
- catch (Exception e)
- {
- throw new RPCException("Could not call the method Message.setObject");
- }
- RspList rsps = SecurityHelper.doPrivilegedAction(new PrivilegedAction<RspList>()
- {
- public RspList run()
- {
- return dispatcher.castMessage(dests, msg, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE,
- timeout);
- }
- });
-
- if (LOG.isTraceEnabled())
- LOG.trace("responses: " + rsps);
- if (rsps == null)
- throw new RPCException("Could not get the responses for command " + commandId + ".");
- if (!synchronous)
- return Collections.emptyList();// async case
- if (LOG.isTraceEnabled())
- {
- LOG.trace("(" + channel.getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps);
- }
- List<Object> retval = new ArrayList<Object>(rsps.size());
- for (Address dest : dests)
- {
- Rsp rsp = rsps.get(dest);
- if (rsp == null || (rsp.wasSuspected() && !rsp.wasReceived()))
- {
- // The corresponding member has left
- retval.add(new MemberHasLeftException("No response for the member " + dest
- + ", this member has probably left the cluster."));
- }
- else if (!rsp.wasReceived())
- {
- retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
- }
- else
- {
- Object value = rsp.getValue();
- if (value instanceof RPCException)
- {
- // if we have any application-level exceptions make sure we throw them!!
- if (LOG.isTraceEnabled())
- LOG.trace("Recieved exception'" + value + "' from " + rsp.getSender(), (RPCException)value);
- }
- retval.add(value);
- }
- }
- return retval;
- }
-
- /**
* {@inheritDoc}
*/
- public Object handle(Message msg)
+ protected void setObject(Message m, Object o)
{
- String commandId = null;
- try
- {
- // Ensure that the service is fully started before trying to execute any command
- startSignal.await();
- MessageBody body = (MessageBody)msg.getObject();
- commandId = body.getCommandId();
- if (!body.accept(channel.getLocalAddress()))
- {
- if (LOG.isTraceEnabled())
- LOG.trace("Command : " + commandId + " needs to be executed on the coordinator " +
- "only and the local node is not the coordinator, the command will be ignored");
- return null;
- }
- RemoteCommand command = getCommand(commandId);
- if (command == null)
- {
- return new RPCException("Command " + commandId + " unkown, please register your command first");
- }
- Object execResult = command.execute(body.getArgs());
- if (LOG.isTraceEnabled())
- LOG.trace("Command : " + commandId + " executed, result is: " + execResult);
- return execResult;
- }
- catch (Throwable x)
- {
- if (LOG.isTraceEnabled())
- LOG.trace("Problems invoking command.", x);
- return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
- }
+ m.setObject((Serializable)o);
}
-
- /**
- * {@inheritDoc}
- */
- public void block()
- {
- }
-
- /**
- * {@inheritDoc}
- */
- public void suspect(Address suspectedMbr)
- {
- }
-
- /**
- * {@inheritDoc}
- */
- public void viewAccepted(View view)
- {
- boolean coordinatorHasChanged;
- synchronized (topologyChangeLock)
- {
- this.members = view.getMembers();
- Address currentCoordinator = coordinator;
- this.coordinator = members != null && members.size() > 0 ? members.get(0) : null;
- this.isCoordinator = coordinator != null && coordinator.equals(channel.getLocalAddress());
- coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(coordinator);
- // Release all the nodes
- topologyChangeLock.notifyAll();
- }
- onTopologyChange(coordinatorHasChanged);
- }
-
- /**
- * Called anytime the topology has changed, this method will notify all the listeners
- * currently registered
- * @param coordinatorHasChanged this parameter is set to <code>true</code> if the
- * coordinator has changed, <code>false</code> otherwise
- */
- private void onTopologyChange(boolean coordinatorHasChanged)
- {
- TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, isCoordinator);
- for (TopologyChangeListener listener : listeners)
- {
- try
- {
- listener.onChange(event);
- }
- catch (Exception e)
- {
- LOG.warn("An error occurs with the listener of type " + listener.getClass(), e);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized RemoteCommand registerCommand(RemoteCommand command)
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (command != null)
- {
- String commandId = command.getId();
- if (commandId == null)
- {
- throw new IllegalArgumentException("The command Id cannot be null");
- }
- Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
- RemoteCommand oldCommand = tmpCommands.put(commandId, command);
- if (oldCommand != null && PropertyManager.isDevelopping())
- {
- LOG.warn("A command has already been registered with the id " + commandId
- + ", this command will be replaced with the new one");
- }
- this.commands = Collections.unmodifiableMap(tmpCommands);
- return command;
- }
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- public synchronized void unregisterCommand(RemoteCommand command)
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (command != null)
- {
- String commandId = command.getId();
- if (commandId == null)
- {
- throw new IllegalArgumentException("The command Id cannot be null");
- }
- if (commands.get(commandId) != command)
- {
- // We prevent to remove any command that has not been registered, thus we expect that
- // the registered instance is exactly the same instance as the one that we want to
- // unregister
- if (PropertyManager.isDevelopping())
- {
- LOG.warn("Cannot unregister an unknown RemoteCommand, either the command id " + commandId
- + " is unknown or the instance of RemoteCommand to unregister is unknown");
- }
- return;
- }
- Map<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
- tmpCommands.remove(commandId);
- this.commands = Collections.unmodifiableMap(tmpCommands);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public boolean isCoordinator() throws RPCException
- {
- if (state != State.STARTED)
- {
- throw new RPCException("Cannot know whether the local node is a coordinator or not if " +
- "the service is not started, the current state of the service is " + state);
- }
- return isCoordinator;
- }
-
- /**
- * {@inheritDoc}
- */
- public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (listener == null)
- {
- return;
- }
- listeners.add(listener);
- }
-
- /**
- * {@inheritDoc}
- */
- public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
- if (listener == null)
- {
- return;
- }
- listeners.remove(listener);
- }
-
- /**
- * Gives the {@link RemoteCommand} corresponding to the given id
- * @param commandId the command id of the command to retrieve
- * @return the corresponding {@link RemoteCommand}
- */
- protected RemoteCommand getCommand(String commandId)
- {
- return commands.get(commandId);
- }
-
- /**
- * {@inheritDoc}
- */
- public void start()
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
-
- try
- {
- SecurityHelper.doPrivilegedExceptionAction(new PrivilegedExceptionAction<Void>()
- {
- public Void run() throws Exception
- {
- channel = new JChannel(configurator);
- channel.setOpt(Channel.AUTO_RECONNECT, true);
- dispatcher = new MessageDispatcher(channel, null, RPCServiceImpl.this, RPCServiceImpl.this);
- channel.connect(clusterName);
- return null;
- }
- });
- }
- catch (PrivilegedActionException pae)
- {
- Throwable cause = pae.getCause();
- if (cause instanceof ChannelException)
- {
- throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", cause);
- }
- else if (cause instanceof RuntimeException)
- {
- throw (RuntimeException)cause;
- }
- else
- {
- throw new RuntimeException(cause);
- }
- }
- finally
- {
- this.state = State.STARTED;
- startSignal.countDown();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void stop()
- {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- {
- security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
- }
-
- this.state = State.STOPPED;
- this.isCoordinator = false;
- if (channel != null && channel.isOpen())
- {
- if (LOG.isInfoEnabled())
- LOG.info("Disconnecting and closing the Channel");
- SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
- {
- public Void run()
- {
- channel.disconnect();
- channel.close();
- return null;
- }
- });
- channel = null;
- }
- if (dispatcher != null)
- {
- dispatcher.stop();
- dispatcher = null;
- }
- }
-
- /**
- * Gives the value of the default timeout
- * @return the default timeout
- */
- protected long getDefaultTimeout()
- {
- return defaultTimeout;
- }
-
- /**
- * Gives the name of the cluster
- * @return the name of the cluster
- */
- protected String getClusterName()
- {
- return clusterName;
- }
-
- /**
- * Gives the value of the retry timeout
- * @return the value of the retry timeout
- */
- protected long getRetryTimeout()
- {
- return retryTimeout;
- }
-
- /**
- * Indicates whether the failover capabilities are enabled or not
- * @return <code>true</code> if the failover capabilities are allowed, <code>false</code>
- * otherwise
- */
- protected boolean isAllowFailover()
- {
- return allowFailover;
- }
-
- /**
- * Gives the value of the {@link ValueParam} corresponding to the given key
- * @param params the list of initial parameters from which we want to extract the {@link ValueParam}
- * @param parameterKey the name of the {@link ValueParam} that we are looking for
- * @return the value if it exists, null otherwise
- */
- private static String getValueParam(InitParams params, String parameterKey)
- {
- try
- {
- return params.getValueParam(parameterKey).getValue().trim();
- }
- catch (NullPointerException e)
- {
- return null;
- }
- }
-
- /**
- * Gives the {@link URL} corresponding to the location of the JGroups configuration
- * @param params the initial parameters from which we extract the parameter
- * <code>PARAM_JGROUPS_CONFIG</code>
- * @param configManager the configuration manager used to get the {@link URL} corresponding
- * to the path given in the configuration of the RPCServiceImpl
- * @return The {@link URL} corresponding to the location of the JGroups configuration,
- * it will throw {@link RuntimeException} otherwise since it is a mandatory configuration.
- */
- private static URL getProperties(InitParams params, ConfigurationManager configManager)
- {
- String configPath = getValueParam(params, PARAM_JGROUPS_CONFIG);
- if (configPath == null)
- {
- throw new IllegalArgumentException("The parameter '" + PARAM_JGROUPS_CONFIG
- + "' of RPCServiceImpl is mandatory");
- }
- URL properties;
- try
- {
- properties = configManager.getResource(configPath);
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
- }
- if (properties == null)
- {
- throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
- }
- return properties;
- }
-
- /**
- * Gives the name of the cluster that will be able to support several portal containers
- * since the name will be post fixed with "-${container-name}"
- * @param ctx the context from which we extract the name of the container
- * @param params the list of initial parameters from which we get the value of the parameter
- * <code>PARAM_CLUSTER_NAME</code> if it exists otherwise the value will be "RPCService-Cluster"
- */
- private static String getClusterName(ExoContainerContext ctx, InitParams params)
- {
- String clusterName = getValueParam(params, PARAM_CLUSTER_NAME);
- if (clusterName == null)
- {
- clusterName = CLUSTER_NAME;
- }
- return clusterName += "-" + ctx.getName();
- }
-
- /**
- * This intern class will be used to
- */
- public static class MessageBody implements Externalizable
- {
- /**
- * The Id of the command to execute
- */
- private String commandId;
-
- /**
- * The list of parameters
- */
- private Serializable[] args;
-
- /**
- * The hash code of the expected destination
- */
- private int destination;
-
- public MessageBody()
- {
- }
-
- /**
- * @param dest The destination of the message
- * @param commandId the id of the command to execute
- * @param args the arguments to use
- */
- public MessageBody(Address dest, String commandId, Serializable[] args)
- {
- this.commandId = commandId;
- this.args = args;
- this.destination = dest == null ? 0 : dest.hashCode();
- }
-
- public String getCommandId()
- {
- return commandId;
- }
-
- public Serializable[] getArgs()
- {
- return args;
- }
-
- /**
- * Indicates whether or not the given message body accepts the given address
- * @param address the address to check
- * @return <code>true</code> if the message is for everybody or if the given address is the expected address,
- * <code>false</code> otherwise
- */
- public boolean accept(Address address)
- {
- return destination == 0 || destination == address.hashCode();
- }
-
- /**
- * {@inheritDoc}
- */
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
- {
- boolean unicast = in.readBoolean();
- if (unicast)
- {
- this.destination = in.readInt();
- }
- this.commandId = in.readUTF();
- int size = in.readInt();
- if (size == -1)
- {
- this.args = null;
- }
- else
- {
- this.args = new Serializable[size];
- for (int i = 0; i < size; i++)
- {
- args[i] = (Serializable)in.readObject();
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void writeExternal(ObjectOutput out) throws IOException
- {
- boolean unicast = destination != 0;
- out.writeBoolean(unicast);
- if (unicast)
- {
- out.writeInt(destination);
- }
- out.writeUTF(commandId);
- if (args == null)
- {
- out.writeInt(-1);
- }
- else
- {
- out.writeInt(args.length);
- for (int i = 0; i < args.length; i++)
- {
- out.writeObject(args[i]);
- }
- }
- }
- }
-
- /**
- * All the potential states of the {@link RPCServiceImpl}
- */
- public enum State {
- INITIALIZED, STARTED, STOPPED
- }
-
- public static class MemberHasLeftException extends RPCException
- {
-
- /**
- * The serial version UID
- */
- private static final long serialVersionUID = 3558158913564367637L;
-
- public MemberHasLeftException(String message)
- {
- super(message);
- }
- }
}
Modified: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -27,7 +27,7 @@
import org.exoplatform.services.rpc.SingleMethodCallCommand;
import org.exoplatform.services.rpc.TopologyChangeEvent;
import org.exoplatform.services.rpc.TopologyChangeListener;
-import org.exoplatform.services.rpc.impl.RPCServiceImpl.MemberHasLeftException;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
import org.exoplatform.test.BasicTestCase;
import org.jgroups.Address;
@@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -805,8 +804,8 @@
o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
assertEquals("OK", o);
- Vector<Address> allMembers = service1.members;
- Vector<Address> coordinatorOnly = new Vector<Address>(1);
+ List<Address> allMembers = service1.members;
+ List<Address> coordinatorOnly = new ArrayList<Address>(1);
coordinatorOnly.add(service1.coordinator);
final RPCServiceImpl service = service2;
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/pom.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,115 @@
+<!--
+
+ Copyright (C) 2009 eXo Platform SAS.
+
+ This is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of
+ the License, or (at your option) any later version.
+
+ This software is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this software; if not, write to the Free
+ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>kernel-parent</artifactId>
+ <version>2.3.5-GA-SNAPSHOT</version>
+ </parent>
+ <artifactId>exo.kernel.component.ext.rpc.impl.jgroups.v3</artifactId>
+ <name>eXo Kernel :: RPC Service Extension :: JGroups 3 Implementation</name>
+ <description>The JGroups 3 implementation of the RPC service</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.commons.test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.component.common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>3.0.0.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>${env.MAVEN_OPTS} -Djava.security.manager=org.exoplatform.commons.test.TestSecurityManager -Djava.security.policy=${project.build.directory}/test-classes/test.policy</argLine>
+ <systemProperties>
+ <!-- We add this system property due to some incompatibility between IPv6 and
+ some JVM of Linux distributions such as Ubuntu and Fedora-->
+ <property>
+ <name>java.net.preferIPv4Stack</name>
+ <value>true</value>
+ </property>
+ <!-- Avoid the firewall -->
+ <property>
+ <name>jgroups.bind_addr</name>
+ <value>127.0.0.1</value>
+ </property>
+ <property>
+ <name>jgroups.stack</name>
+ <value>udp</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-policy</id>
+ <phase>process-test-resources</phase>
+ <configuration>
+ <tasks>
+ <echo>Creating Access Policy for tests</echo>
+ <makeurl file="${settings.localRepository}" property="localRepositoryURL" />
+ <makeurl file="${project.build.outputDirectory}" property="outputDirectoryURL" />
+ <makeurl file="${project.build.testOutputDirectory}" property="testOutputDirectoryURL" />
+ <copy todir="${project.build.testOutputDirectory}" overwrite="true">
+ <fileset dir="${project.basedir}/src/test/resources/">
+ <include name="test.policy" />
+ </fileset>
+ <filterset>
+ <filter token="MAVEN_REPO" value="${localRepositoryURL}" />
+ <filter token="MAIN_CLASSES" value="${outputDirectoryURL}" />
+ <filter token="TEST_CLASSES" value="${testOutputDirectoryURL}" />
+ </filterset>
+ </copy>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>ant</groupId>
+ <artifactId>ant-optional</artifactId>
+ <version>1.5.3-1</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/main/java/org/exoplatform/services/rpc/jgv3/RPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.jgv3;
+
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.services.rpc.impl.AbstractRPCService;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.util.RspList;
+
+import java.util.List;
+
+/**
+ * This class is the implementation of the {@link AbstractRPCService} for JGroups 3.
+ *
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ */
+public class RPCServiceImpl extends AbstractRPCService
+{
+
+ /**
+ * {@inheritDoc}
+ */
+ public RPCServiceImpl(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager)
+ {
+ super(ctx, params, configManager);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected Address getLocalAddress()
+ {
+ return channel.getAddress();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected RspList<Object> castMessage(List<Address> dests, Message msg, boolean synchronous, long timeout) throws Exception
+ {
+ return dispatcher.castMessage(dests, msg, new RequestOptions(synchronous ? ResponseMode.GET_ALL : ResponseMode.GET_NONE, timeout));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected Channel createChannel() throws Exception
+ {
+ return new JChannel(configurator);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void unblock()
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected List<Address> getMembers(View view)
+ {
+ return view.getMembers();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected void setObject(Message m, Object o)
+ {
+ m.setObject(o);
+ }
+}
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,1281 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.rpc.impl;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.rpc.RPCException;
+import org.exoplatform.services.rpc.RemoteCommand;
+import org.exoplatform.services.rpc.SingleMethodCallCommand;
+import org.exoplatform.services.rpc.TopologyChangeEvent;
+import org.exoplatform.services.rpc.TopologyChangeListener;
+import org.exoplatform.services.rpc.impl.AbstractRPCService.MemberHasLeftException;
+import org.exoplatform.services.rpc.jgv3.RPCServiceImpl;
+import org.exoplatform.test.BasicTestCase;
+import org.jgroups.Address;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is the unit test class for the service {@link RPCServiceImpl}
+ *
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas Filotto</a>
+ * @version $Id$
+ *
+ */
+public class TestRPCServiceImpl extends BasicTestCase
+{
+ private PortalContainer container;
+ private ConfigurationManager configManager;
+
+ public void setUp() throws Exception
+ {
+ container = PortalContainer.getInstance();
+ configManager = (ConfigurationManager)container.getComponentInstanceOfType(ConfigurationManager.class);
+ }
+
+ public void testParameters()
+ {
+ InitParams params = null;
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ params = new InitParams();
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ params.addParameter(paramConf);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("fakePath");
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a IllegalArgumentException since the jgroups config cannot be found");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(RPCServiceImpl.DEFAULT_TIMEOUT, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramTimeout = new ValueParam();
+ paramTimeout.setName(RPCServiceImpl.PARAM_DEFAULT_TIMEOUT);
+ paramTimeout.setValue("fakeValue");
+ params.addParameter(paramTimeout);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a NumberFormatException since the timeout is not properly set");
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ paramTimeout.setValue("60");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(RPCServiceImpl.DEFAULT_RETRY_TIMEOUT, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramRetryTimeout = new ValueParam();
+ paramRetryTimeout.setName(RPCServiceImpl.PARAM_RETRY_TIMEOUT);
+ paramRetryTimeout.setValue("fakeValue");
+ params.addParameter(paramRetryTimeout);
+ try
+ {
+ new RPCServiceImpl(container.getContext(), params, configManager);
+ fail("We expect a NumberFormatException since the retry timeout is not properly set");
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ paramRetryTimeout.setValue("60");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ ValueParam paramAllowFailover = new ValueParam();
+ paramAllowFailover.setName(RPCServiceImpl.PARAM_ALLOW_FAILOVER);
+ paramAllowFailover.setValue("fakeValue");
+ params.addParameter(paramAllowFailover);
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(false, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ paramAllowFailover.setValue("TRUE");
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(60, service.getRetryTimeout());
+ assertEquals(true, service.isAllowFailover());
+ assertEquals(RPCServiceImpl.CLUSTER_NAME + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+
+ ValueParam paramClusterName = new ValueParam();
+ paramClusterName.setName(RPCServiceImpl.PARAM_CLUSTER_NAME);
+ paramClusterName.setValue("MyName");
+ params.addParameter(paramClusterName);
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ assertEquals(60, service.getDefaultTimeout());
+ assertEquals(paramClusterName.getValue() + "-" + container.getContext().getName(), service.getClusterName());
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testStates() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ RemoteCommand foo = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "foo";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+
+ service.registerCommand(foo);
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.isCoordinator();
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ service.start();
+ assertEquals(true, service.isCoordinator());
+ service.executeCommandOnAllNodes(foo, true);
+ service.executeCommandOnAllNodes(foo, 10);
+ service.executeCommandOnCoordinator(foo, true);
+ service.executeCommandOnCoordinator(foo, 10);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, true);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(foo, 10);
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ }
+
+ public void testCommands() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand fake = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ RemoteCommand fake2_Unregistered = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "fake2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(fake2);
+ RemoteCommand Exception = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Exception";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ };
+ service.registerCommand(Exception);
+ RemoteCommand Error = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "Error";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ } ;
+ service.registerCommand(Error);
+ RemoteCommand StringValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "StringValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service.registerCommand(StringValue);
+ RemoteCommand NullValue = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "NullValue";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return null;
+ }
+ };
+ service.registerCommand(NullValue);
+ RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ };
+ service.registerCommand(LongTask);
+ service.start();
+ try
+ {
+ service.executeCommandOnAllNodes(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnAllNodes(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ try
+ {
+ service.executeCommandOnCoordinator(fake2_Unregistered, true);
+ fail("We expect a RPCException since the command is unknown");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ List<Object> result;
+ result = service.executeCommandOnAllNodes(Exception, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Exception, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(Error, true);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since one node could not execute the command", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(Error, true);
+ fail("We expect a RPCException since one node could not execute the command");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertNull(o);
+ result = service.executeCommandOnAllNodes(LongTask, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(0) instanceof RPCException);
+ try
+ {
+ service.executeCommandOnCoordinator(LongTask, 1000);
+ fail("We expect an RPCException due to a Replication Timeout");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(LongTask, false);
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ assertNull(service.executeCommandOnCoordinator(LongTask, false));
+
+ result = service.executeCommandOnAllNodes(StringValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertEquals("OK", result.get(0));
+ o = service.executeCommandOnCoordinator(StringValue, true);
+ assertEquals("OK", o);
+ result = service.executeCommandOnAllNodes(NullValue, true);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertNull(result.get(0));
+ o = service.executeCommandOnCoordinator(NullValue, true);
+ assertNull(o);
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSeveralNodes() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service1 = null, service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand CmdUnknownOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "CmdUnknownOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(CmdUnknownOnNode2);
+ RemoteCommand ExceptionOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ExceptionOnNode2);
+ RemoteCommand ErrorOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(ErrorOnNode2);
+
+ RemoteCommand LongTaskOnNode2 = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service1.registerCommand(LongTaskOnNode2);
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(3000);
+ return "OldCoordinator";
+ }
+ });
+ service1.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ExceptionOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Exception("MyException");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "ErrorOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ throw new Error("MyError");
+ }
+ });
+ service2.registerCommand(new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTaskOnNode2";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ Thread.sleep(2000);
+ return null;
+ }
+ });
+ RemoteCommand OK = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "OK";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "OK";
+ }
+ };
+ service2.registerCommand(OK);
+ final RemoteCommand LongTask = new RemoteCommand()
+ {
+
+ public String getId()
+ {
+ return "LongTask";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ return "NewCoordinator";
+ }
+ };
+ service2.registerCommand(LongTask);
+ MyListener listener1 = new MyListener();
+ service1.registerTopologyChangeListener(listener1);
+ MyListener listener2 = new MyListener();
+ service2.registerTopologyChangeListener(listener2);
+ assertFalse(listener1.coordinatorHasChanged);
+ assertFalse(listener1.isCoordinator);
+ assertEquals(0, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(0, listener2.count);
+ service1.start();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(1, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(0, listener2.count);
+ service2.start();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(2, listener1.count);
+ assertFalse(listener2.coordinatorHasChanged);
+ assertFalse(listener2.isCoordinator);
+ assertEquals(1, listener2.count);
+ assertEquals(true, service1.isCoordinator());
+ assertEquals(false, service2.isCoordinator());
+ List<Object> result;
+ Object o;
+ result = service1.executeCommandOnAllNodes(CmdUnknownOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command is unknown on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(CmdUnknownOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ExceptionOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ExceptionOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(ErrorOnNode2, true);
+ assertTrue(result != null && result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect a RPCException since the command fails on node 2", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(ErrorOnNode2, true);
+ assertEquals("OK", o);
+
+ result = service1.executeCommandOnAllNodes(LongTaskOnNode2, 1000);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertEquals("OK", result.get(0));
+ assertTrue("We expect an RPCException due to a Replication Timeout", result.get(1) instanceof RPCException);
+ o = service1.executeCommandOnCoordinator(LongTaskOnNode2, 1000);
+ assertEquals("OK", o);
+
+ List<Address> allMembers = service1.members;
+ List<Address> coordinatorOnly = new ArrayList<Address>(1);
+ coordinatorOnly.add(service1.coordinator);
+
+ final RPCServiceImpl service = service2;
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ final CountDownLatch doneSignal = new CountDownLatch(1);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Object o = service.executeCommandOnCoordinator(LongTask, true);
+ assertEquals("NewCoordinator", o);
+ }
+ catch (Throwable e)
+ {
+ error.set(e);
+ e.printStackTrace();
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ t.start();
+ service1.stop();
+ listener2.waitTopologyChange();
+ assertFalse(listener1.coordinatorHasChanged);
+ assertTrue(listener1.isCoordinator);
+ assertEquals(2, listener1.count);
+ assertTrue(listener2.coordinatorHasChanged);
+ assertTrue(listener2.isCoordinator);
+ assertEquals(2, listener2.count);
+ doneSignal.await();
+ assertNull(error.get() != null ? error.get().getMessage() : "", error.get());
+ result = service2.excecuteCommand(allMembers, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 2);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ assertEquals("OK", result.get(1));
+ result = service2.excecuteCommand(coordinatorOnly, OK, true, 0);
+ assertNotNull(result);
+ assertTrue(result.size() == 1);
+ assertTrue("We expect an RPCException due to a member that has left", result.get(0) instanceof MemberHasLeftException);
+ try
+ {
+ service1.isCoordinator();
+ fail("We expect a RPCException since the current state is not the expected one");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ assertEquals(true, service2.isCoordinator());
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
+ public void testSingleMethodCallCommand() throws Exception
+ {
+ try
+ {
+ new SingleMethodCallCommand(null, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ MyService myService = new MyService();
+ try
+ {
+ new SingleMethodCallCommand(myService, null);
+ fail("we expect an IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "foo");
+ fail("we expect an NoSuchMethodException");
+ }
+ catch (NoSuchMethodException e)
+ {
+ // OK
+ }
+ try
+ {
+ new SingleMethodCallCommand(myService, "getPrivateName");
+ fail("we expect an IllegalArgumentException since only the public methods are allowed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // OK
+ }
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+ RPCServiceImpl service = null;
+ try
+ {
+ service = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand getName = service.registerCommand(new SingleMethodCallCommand(myService, "getName"));
+ RemoteCommand add = service.registerCommand(new SingleMethodCallCommand(myService, "add", int.class));
+ RemoteCommand evaluate1 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", int[].class));
+ RemoteCommand evaluate2 = service.registerCommand(new SingleMethodCallCommand(myService, "evaluate", List.class));
+ RemoteCommand total1 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class));
+ RemoteCommand total2 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int.class, int.class));
+ RemoteCommand total3 = service.registerCommand(new SingleMethodCallCommand(myService, "total", int[].class));
+ RemoteCommand total4 = service.registerCommand(new SingleMethodCallCommand(myService, "total", String.class, long.class, int[].class));
+ RemoteCommand testTypes1 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", String[].class));
+ RemoteCommand testTypes2 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", int[].class));
+ RemoteCommand testTypes3 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", long[].class));
+ RemoteCommand testTypes4 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", byte[].class));
+ RemoteCommand testTypes5 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", short[].class));
+ RemoteCommand testTypes6 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", char[].class));
+ RemoteCommand testTypes7 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", double[].class));
+ RemoteCommand testTypes8 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", float[].class));
+ RemoteCommand testTypes9 = service.registerCommand(new SingleMethodCallCommand(myService, "testTypes", boolean[].class));
+
+ service.start();
+ List<Object> result;
+
+ assertEquals("name", service.executeCommandOnCoordinator(getName, true));
+ result = service.executeCommandOnAllNodes(getName, true);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("name", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(add, true, 10));
+ result = service.executeCommandOnAllNodes(add, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate1, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(evaluate1, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ List<Integer> values = new ArrayList<Integer>();
+ values.add(10);
+ values.add(10);
+ values.add(10);
+ values.add(30);
+ values.add(40);
+ assertEquals(100, service.executeCommandOnCoordinator(evaluate2, true, (Serializable)values));
+ result = service.executeCommandOnAllNodes(evaluate2, true, (Serializable)values);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(total1, true, 10));
+ result = service.executeCommandOnAllNodes(total1, true, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(20, service.executeCommandOnCoordinator(total2, true, 10, 10));
+ result = service.executeCommandOnAllNodes(total2, true, 10, 10);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(20, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total3, true, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total3, true, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(100, service.executeCommandOnCoordinator(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40}));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, new int[]{10, 10, 10, 30, 40});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(100, result.get(0));
+
+ assertEquals(0, service.executeCommandOnCoordinator(total4, true, "foo", 50, null));
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50, null);
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(0, result.get(0));
+
+ try
+ {
+ service.executeCommandOnCoordinator(total4, true, "foo", 50);
+ fail("We expect a RPCException since the list of arguments mismatch with what is expected");
+ }
+ catch (RPCException e)
+ {
+ // OK
+ }
+ result = service.executeCommandOnAllNodes(total4, true, "foo", 50);
+ assertTrue(result != null && result.size() == 1);
+ assertTrue("We expect a RPCException since the list of arguments mismatch with what is expected", result.get(0) instanceof RPCException);
+
+ assertEquals("foo", service.executeCommandOnCoordinator(testTypes1, true, (Serializable)new String[]{"foo"}));
+ result = service.executeCommandOnAllNodes(testTypes1, true, (Serializable)new String[]{"foo"});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals("foo", result.get(0));
+
+ assertEquals(10, service.executeCommandOnCoordinator(testTypes2, true, new int[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes2, true, new int[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10, result.get(0));
+
+ assertEquals(11L, service.executeCommandOnCoordinator(testTypes3, true, new long[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes3, true, new long[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(11L, result.get(0));
+
+ assertEquals((byte)12, service.executeCommandOnCoordinator(testTypes4, true, new byte[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes4, true, new byte[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((byte)12, result.get(0));
+
+ assertEquals((short)13, service.executeCommandOnCoordinator(testTypes5, true, new short[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes5, true, new short[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((short)13, result.get(0));
+
+ assertEquals('a', service.executeCommandOnCoordinator(testTypes6, true, new char[]{'a'}));
+ result = service.executeCommandOnAllNodes(testTypes6, true, new char[]{'a'});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals('a', result.get(0));
+
+ assertEquals(10.5, service.executeCommandOnCoordinator(testTypes7, true, new double[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes7, true, new double[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(10.5, result.get(0));
+
+ assertEquals((float)11.5, service.executeCommandOnCoordinator(testTypes8, true, new float[]{10}));
+ result = service.executeCommandOnAllNodes(testTypes8, true, new float[]{10});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals((float)11.5, result.get(0));
+
+ assertEquals(true, service.executeCommandOnCoordinator(testTypes9, true, new boolean[]{true}));
+ result = service.executeCommandOnAllNodes(testTypes9, true, new boolean[]{true});
+ assertTrue(result != null && result.size() == 1);
+ assertEquals(true, result.get(0));
+
+ }
+ finally
+ {
+ if (service != null)
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public static class MyService
+ {
+ private int value = 0;
+
+ public int add(int i)
+ {
+ return value += i;
+ }
+
+ @SuppressWarnings("unused")
+ private String getPrivateName()
+ {
+ return "name";
+ }
+
+ public String getName()
+ {
+ return "name";
+ }
+
+ public int total(int i)
+ {
+ return i;
+ }
+
+ public int total(int i1, int i2)
+ {
+ return i1 + i2;
+ }
+
+ public int total(int... values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int total(String s, long l, int... values)
+ {
+ int total = 0;
+ if (values != null)
+ {
+ for (int i : values)
+ {
+ total += i;
+ }
+ }
+ return total;
+ }
+
+ public int evaluate(int[] values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public int evaluate(List<Integer> values)
+ {
+ int total = 0;
+ for (int i : values)
+ {
+ total += i;
+ }
+ return total;
+ }
+
+ public String testTypes(String... values)
+ {
+ return values[0];
+ }
+
+ public boolean testTypes(boolean... values)
+ {
+ return values[0];
+ }
+
+ public char testTypes(char... values)
+ {
+ return values[0];
+ }
+
+ public double testTypes(double... values)
+ {
+ return values[0] + 0.5;
+ }
+
+ public float testTypes(float... values)
+ {
+ return (float)(values[0] + 1.5);
+ }
+
+ public int testTypes(int... values)
+ {
+ return values[0];
+ }
+
+ public long testTypes(long... values)
+ {
+ return values[0] + 1;
+ }
+
+ public byte testTypes(byte... values)
+ {
+ return (byte)(values[0] + 2);
+ }
+
+ public short testTypes(short... values)
+ {
+ return (short)(values[0] + 3);
+ }
+ }
+
+ public void testExecOnCoordinator() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+
+ final List<Boolean> calledCommands = Collections.synchronizedList(new ArrayList<Boolean>());
+
+ RPCServiceImpl service1 = null;
+ RPCServiceImpl service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service1Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 1";
+ }
+ };
+ service1.registerCommand(service1Cmd);
+
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service2Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 2";
+ }
+ };
+ service2.registerCommand(service2Cmd);
+ // starting services
+ service1.start();
+ service2.start();
+
+ Object o = service1.executeCommandOnCoordinator(service1Cmd, true);
+ assertEquals("service 1", o);
+
+ // it should be executed once
+ assertEquals(1, calledCommands.size());
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
+ private static class MyListener implements TopologyChangeListener
+ {
+
+ private boolean coordinatorHasChanged;
+ private boolean isCoordinator;
+ private int count;
+
+ private CountDownLatch lock = new CountDownLatch(2);
+
+ /**
+ * @see org.exoplatform.services.rpc.TopologyChangeListener#onChange(org.exoplatform.services.rpc.TopologyChangeEvent)
+ */
+ public void onChange(TopologyChangeEvent event)
+ {
+ this.coordinatorHasChanged = event.isCoordinatorHasChanged();
+ this.isCoordinator = event.isCoordinator();
+ count++;
+
+ lock.countDown();
+ }
+
+ public void waitTopologyChange() throws InterruptedException
+ {
+ lock.await();
+ }
+ }
+}
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/conf/portal/udp.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,67 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <UDP
+ mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
+ mcast_port="${jgroups.udp.mcast_port:45588}"
+ tos="8"
+ ucast_recv_buf_size="20M"
+ ucast_send_buf_size="640K"
+ mcast_recv_buf_size="25M"
+ mcast_send_buf_size="640K"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ ip_ttl="${jgroups.udp.ip_ttl:8}"
+ enable_bundling="true"
+ enable_diagnostics="true"
+ thread_naming_pattern="cl"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="2"
+ thread_pool.max_threads="8"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="true"
+ thread_pool.queue_max_size="10000"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+
+ <PING timeout="2000"
+ num_initial_members="3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD_ALL/>
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK exponential_backoff="300"
+ xmit_stagger_timeout="200"
+ use_mcast_xmit="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER />
+ <!-- pbcast.FLUSH /-->
+</config>
\ No newline at end of file
Added: kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy
===================================================================
--- kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy (rev 0)
+++ kernel/trunk/exo.kernel.component.ext.rpc.impl.jgroups.v3/src/test/resources/test.policy 2011-12-06 13:40:17 UTC (rev 5270)
@@ -0,0 +1,35 @@
+grant codeBase "@MAVEN_REPO@-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@TEST_CLASSES@-"{
+ permission java.lang.RuntimePermission "accessRPCService";
+};
+
+grant codeBase "@TEST_CLASSES@-"{
+ permission java.lang.RuntimePermission "manageContainer";
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.component.common/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.commons.test/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.commons/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.container/-"{
+ permission java.security.AllPermission;
+};
+
+
+
+
Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml 2011-12-06 09:36:07 UTC (rev 5269)
+++ kernel/trunk/pom.xml 2011-12-06 13:40:17 UTC (rev 5270)
@@ -53,6 +53,7 @@
<module>exo.kernel.commons</module>
<module>exo.kernel.commons.test</module>
<module>exo.kernel.component.common</module>
+ <module>exo.kernel.component.ext.rpc.impl.jgroups.v3</module>
<module>exo.kernel.component.cache</module>
<module>exo.kernel.component.ext.cache.impl.jboss.v3</module>
<module>exo.kernel.component.ext.cache.impl.infinispan.v5</module>
@@ -86,7 +87,7 @@
</dependency>
<dependency>
<groupId>org.exoplatform.kernel</groupId>
- <artifactId>exo.kernel.component.remote</artifactId>
+ <artifactId>exo.kernel.component.common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
12 years, 5 months
exo-jcr SVN: r5269 - jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/jcr.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-06 04:36:07 -0500 (Tue, 06 Dec 2011)
New Revision: 5269
Modified:
jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/jcr/db-cleaner-service.xml
Log:
EXOJCR-1603: Docbook updated
Modified: jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/jcr/db-cleaner-service.xml
===================================================================
--- jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/jcr/db-cleaner-service.xml 2011-12-06 09:35:46 UTC (rev 5268)
+++ jcr/trunk/exo.jcr.docs/exo.jcr.docs.developer/en/src/main/docbook/en-US/modules/jcr/db-cleaner-service.xml 2011-12-06 09:36:07 UTC (rev 5269)
@@ -7,16 +7,18 @@
<title>DBCleanService</title>
<section>
- <title>API</title>
+ <title>Description</title>
- <para>It is special service for removing data from database.</para>
+ <para>It is special service for data removal from database. The article
+ shortly describes the principles of work DBCleaner under all
+ databases</para>
<note>
<para>Code that invokes methods of DBCleanService must have
JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION permission;</para>
</note>
- <para>There are three methods of DBCleanerService:</para>
+ <para>There are several methods of DBCleanerService:</para>
<table>
<title>API</title>
@@ -27,28 +29,201 @@
<entry>public static void cleanWorkspaceData(WorkspaceEntry
wsEntry)</entry>
- <entry>Clean workspace data from database. Tables will be removed
- in case of multiDB, and only records will be removed in case of
- singleDB.</entry>
+ <entry>Clean workspace data from database</entry>
</row>
<row>
<entry>public static void cleanRepositoryData(RepositoryEntry
repoEntry)</entry>
- <entry>Cleanup repository data from database.</entry>
+ <entry>Cleanup repository data from database</entry>
</row>
<row>
- <entry>public static getDBCleaner(Connection jdbcConn,
- WorkspaceEntry wsEntry)</entry>
+ <entry>public static DBCleaner getWorkspaceDBCleaner(Connection
+ jdbcConn, WorkspaceEntry wsEntry)</entry>
- <entry>Returns DBClean object with defined connection that allow
- to manual invoke clean method on it. Note: DBClean doesn't perform
- commit or close connection. It should be done manually.</entry>
+ <entry>Returns database cleaner of workspace.</entry>
</row>
+
+ <row>
+ <entry>public static DBCleaner getRepositoryDBCleaner(Connection
+ jdbcConn, RepositoryEntry repoEntry)</entry>
+
+ <entry>Returns database cleaner of repository. Returns null in
+ case of multi-db configuration.</entry>
+ </row>
</tbody>
</tgroup>
</table>
+
+ <para>The cleaning is a part of restoring from backup and it is used in
+ the following restore phases: </para>
+
+ <table>
+ <title>Relations between restore phases and what is called on
+ DBCleaner</title>
+
+ <tgroup cols="2">
+ <tbody>
+ <row>
+ <entry>clean</entry>
+
+ <entry><programlisting language="java">dbCleaner.executeCleanScripts();</programlisting></entry>
+ </row>
+
+ <row>
+ <entry>restore</entry>
+
+ <entry>does nothing with DBCleaner</entry>
+ </row>
+
+ <row>
+ <entry>commit</entry>
+
+ <entry><programlisting language="java">dbCleaner.executeCommitScripts();
+connection.commit();</programlisting></entry>
+ </row>
+
+ <row>
+ <entry>rollback</entry>
+
+ <entry><programlisting language="java">connection.rollback();
+
+dbCleaner.executeRollbackScripts();
+connection.commit();</programlisting></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>Different approaches are used for database cleaning depending on
+ database and JCR configuration.</para>
</section>
+
+ <section>
+ <title>Need to clean only single workspace</title>
+
+ <para>Simple cleaning records from JCR table is used in case of single-db
+ configuration. </para>
+
+ <table>
+ <title>PostgreSQL, DB2 and MSSQL </title>
+
+ <tgroup cols="2">
+ <tbody>
+ <row>
+ <entry>executeCleanScripts()</entry>
+
+ <entry>removing all records from the database. Foreign key of
+ JCR_SITEM table is also removed</entry>
+ </row>
+
+ <row>
+ <entry>executeCommitScripts()</entry>
+
+ <entry>adding foreign key</entry>
+ </row>
+
+ <row>
+ <entry>executeRollbackScripts()</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <table>
+ <title>Oracle, Sybase, HSQLDB, MySQL</title>
+
+ <tgroup cols="2">
+ <tbody>
+ <row>
+ <entry>executeCleanScripts()</entry>
+
+ <entry>removing all records from the database. Foreign key of
+ JCR_SITEM table is also removed</entry>
+ </row>
+
+ <row>
+ <entry>executeCommitScripts()</entry>
+
+ <entry>adding foreign key</entry>
+ </row>
+
+ <row>
+ <entry>executeRollbackScripts()</entry>
+
+ <entry>adding foreign key</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>Either removing or renaming JCR tables are used in case of mult-db
+ configuration.</para>
+
+ <table>
+ <title>PostgreSQL, DB2 and MSSQL</title>
+
+ <tgroup cols="2">
+ <tbody>
+ <row>
+ <entry>executeCleanScripts()</entry>
+
+ <entry>removing tables JCR_MVALUE, JCR_MREF, JCR_MITEM,
+ initializing new tables without foreign key of JCR_MITEM table,
+ adding root</entry>
+ </row>
+
+ <row>
+ <entry>executeCommitScripts()</entry>
+
+ <entry>adding foreign key</entry>
+ </row>
+
+ <row>
+ <entry>executeRollbackScripts()</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <table>
+ <title>Oracle, Sybase, HSQLDB, MySQL</title>
+
+ <tgroup cols="2">
+ <tbody>
+ <row>
+ <entry>executeCleanScripts()</entry>
+
+ <entry>renaming current tables, initializing new tables without
+ foreign key of JCR_MITEM table, adding root node, removing indexes
+ for some databases</entry>
+ </row>
+
+ <row>
+ <entry>executeCommitScripts()</entry>
+
+ <entry>renaming tables, adding indexes</entry>
+ </row>
+
+ <row>
+ <entry>executeRollbackScripts()</entry>
+
+ <entry>removing previously renamed tables, adding indexes, adding
+ foreign key</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </section>
+
+ <section>
+ <title>Need to clean the whole repository</title>
+
+ <para>In case of single-db all workspaces will be processed simultaneously
+ as in case of single workspace multi-db configuration. For multi-db every
+ workspace will be processed separately as in case of single workspace
+ multi-db configuration.</para>
+ </section>
</chapter>
12 years, 5 months
exo-jcr SVN: r5268 - jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-06 04:35:46 -0500 (Tue, 06 Dec 2011)
New Revision: 5268
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
Log:
EXOJCR-1603: Java doc updated
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 08:07:06 UTC (rev 5267)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 09:35:46 UTC (rev 5268)
@@ -168,7 +168,7 @@
}
/**
- * Returns database cleaner for manual cleaning for repository.
+ * Returns database cleaner of repository.
*
* @param jdbcConn
* database connection which need to use
@@ -834,7 +834,7 @@
}
/**
- * Returns database cleaner for manual cleaning of workspace.
+ * Returns database cleaner of workspace.
*
* @param jdbcConn
* database connection which need to use
12 years, 5 months
exo-jcr SVN: r5267 - jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-06 03:07:06 -0500 (Tue, 06 Dec 2011)
New Revision: 5267
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
Log:
EXOJCR-1603: Code that invokes methods of DBCleanService must have JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION permission;
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 08:06:15 UTC (rev 5266)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 08:07:06 UTC (rev 5267)
@@ -79,6 +79,13 @@
public static void cleanWorkspaceData(WorkspaceEntry wsEntry) throws RepositoryConfigurationException,
NamingException, SQLException
{
+ // Need privileges to manage repository.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION);
+ }
+
String dsName = wsEntry.getContainer().getParameterValue(JDBCWorkspaceDataContainer.SOURCE_NAME);
final DataSource ds = (DataSource)new InitialContext().lookup(dsName);
@@ -114,6 +121,13 @@
public static void cleanRepositoryData(RepositoryEntry rEntry) throws RepositoryConfigurationException,
NamingException, SQLException
{
+ // Need privileges to manage repository.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION);
+ }
+
if (rEntry.getWorkspaceEntries().size() == 0)
{
// nothing to clean
12 years, 5 months
exo-jcr SVN: r5266 - jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-06 03:06:15 -0500 (Tue, 06 Dec 2011)
New Revision: 5266
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
Log:
EXOJCR-1603: Code that invokes methods of DBCleanService must have JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION permission;
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 07:50:45 UTC (rev 5265)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 08:06:15 UTC (rev 5266)
@@ -20,6 +20,7 @@
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.config.RepositoryEntry;
import org.exoplatform.services.jcr.config.WorkspaceEntry;
+import org.exoplatform.services.jcr.core.security.JCRRuntimePermissions;
import org.exoplatform.services.jcr.impl.storage.jdbc.DBConstants;
import org.exoplatform.services.jcr.impl.storage.jdbc.DialectDetecter;
import org.exoplatform.services.jcr.impl.storage.jdbc.JDBCWorkspaceDataContainer;
@@ -166,6 +167,13 @@
public static DBCleaner getRepositoryDBCleaner(Connection jdbcConn, RepositoryEntry repoEntry) throws SQLException,
RepositoryConfigurationException
{
+ // Need privileges to manage repository.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION);
+ }
+
final boolean isMultiDB =
Boolean.parseBoolean(repoEntry.getWorkspaceEntries().get(0).getContainer()
.getParameterValue(JDBCWorkspaceDataContainer.MULTIDB));
@@ -825,6 +833,13 @@
public static DBCleaner getWorkspaceDBCleaner(Connection jdbcConn, WorkspaceEntry wsEntry) throws SQLException,
RepositoryConfigurationException
{
+ // Need privileges to manage repository.
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkPermission(JCRRuntimePermissions.MANAGE_REPOSITORY_PERMISSION);
+ }
+
boolean isMultiDB =
Boolean.parseBoolean(wsEntry.getContainer().getParameterValue(JDBCWorkspaceDataContainer.MULTIDB));
12 years, 5 months
exo-jcr SVN: r5265 - jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-06 02:50:45 -0500 (Tue, 06 Dec 2011)
New Revision: 5265
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
Log:
EXOJCR-1603: small refactoring
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-05 16:04:47 UTC (rev 5264)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/clean/rdbms/DBCleanService.java 2011-12-06 07:50:45 UTC (rev 5265)
@@ -98,32 +98,7 @@
jdbcConn.setAutoCommit(false);
DBCleaner dbCleaner = getWorkspaceDBCleaner(jdbcConn, wsEntry);
- try
- {
- dbCleaner.executeCleanScripts();
-
- try
- {
- dbCleaner.executeCommitScripts();
- }
- catch (SQLException e)
- {
- LOG.error("Can't remove temporary objects", e);
- }
-
- jdbcConn.commit();
- }
- catch (SQLException e)
- {
- jdbcConn.rollback();
-
- dbCleaner.executeRollbackScripts();
- jdbcConn.commit();
- }
- finally
- {
- jdbcConn.close();
- }
+ processingClean(dbCleaner, jdbcConn);
}
/**
@@ -138,10 +113,43 @@
public static void cleanRepositoryData(RepositoryEntry rEntry) throws RepositoryConfigurationException,
NamingException, SQLException
{
- for (WorkspaceEntry wsEntry : rEntry.getWorkspaceEntries())
+ if (rEntry.getWorkspaceEntries().size() == 0)
{
- cleanWorkspaceData(wsEntry);
+ // nothing to clean
+ return;
}
+
+ String dsName =
+ rEntry.getWorkspaceEntries().get(0).getContainer().getParameterValue(JDBCWorkspaceDataContainer.SOURCE_NAME);
+
+ final DataSource ds = (DataSource)new InitialContext().lookup(dsName);
+ if (ds == null)
+ {
+ throw new NameNotFoundException("Data source " + dsName + " not found");
+ }
+
+ Connection jdbcConn = SecurityHelper.doPrivilegedSQLExceptionAction(new PrivilegedExceptionAction<Connection>()
+ {
+ public Connection run() throws Exception
+ {
+ return ds.getConnection();
+
+ }
+ });
+
+ jdbcConn.setAutoCommit(false);
+ DBCleaner dbCleaner = getRepositoryDBCleaner(jdbcConn, rEntry);
+ if (dbCleaner != null)
+ {
+ processingClean(dbCleaner, jdbcConn);
+ }
+ else
+ {
+ for (WorkspaceEntry wsEntry : rEntry.getWorkspaceEntries())
+ {
+ cleanWorkspaceData(wsEntry);
+ }
+ }
}
/**
@@ -176,36 +184,31 @@
dialect = DialectDetecter.detect(jdbcConn.getMetaData());
}
- if (dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_ORACLE)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_ORACLEOCI)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_MYSQL)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_MYSQL_UTF8)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_MYSQL_MYISAM)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_MYSQL_MYISAM_UTF8)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_SYBASE)
- || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_HSQLDB))
+ if (dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_DB2)
+ || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_DB2V8)
+ || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_MSSQL)
+ || dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_PGSQL))
{
List<String> dbCleanerScripts = new ArrayList<String>();
- dbCleanerScripts.addAll(getRenameScripts(isMultiDB, dialect));
+ dbCleanerScripts.addAll(getDropTableScripts(isMultiDB, dialect));
dbCleanerScripts.addAll(getInitializationDBScripts(isMultiDB, dialect));
dbCleanerScripts.addAll(getRemoveIndexesScripts(isMultiDB, dialect));
- List<String> afterRestoreScript = new ArrayList<String>();
- afterRestoreScript.addAll(getRemoveOldObjectsScripts(isMultiDB, dialect));
- afterRestoreScript.addAll(getRestoreIndexesScripts(isMultiDB, dialect));
-
- return new DBCleaner(jdbcConn, dbCleanerScripts, getRollbackScripts(isMultiDB, dialect), afterRestoreScript,
- dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_SYBASE));
+ return new DBCleaner(jdbcConn, dbCleanerScripts, new ArrayList<String>(), getRestoreIndexesScripts(isMultiDB,
+ dialect), false);
}
List<String> dbCleanerScripts = new ArrayList<String>();
- dbCleanerScripts.addAll(getDropTableScripts(isMultiDB, dialect));
+ dbCleanerScripts.addAll(getRenameScripts(isMultiDB, dialect));
dbCleanerScripts.addAll(getInitializationDBScripts(isMultiDB, dialect));
dbCleanerScripts.addAll(getRemoveIndexesScripts(isMultiDB, dialect));
- return new DBCleaner(jdbcConn, dbCleanerScripts, new ArrayList<String>(), getRestoreIndexesScripts(isMultiDB,
- dialect), dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_SYBASE));
+ List<String> afterRestoreScript = new ArrayList<String>();
+ afterRestoreScript.addAll(getRemoveOldObjectsScripts(isMultiDB, dialect));
+ afterRestoreScript.addAll(getRestoreIndexesScripts(isMultiDB, dialect));
+ return new DBCleaner(jdbcConn, dbCleanerScripts, getRollbackScripts(isMultiDB, dialect), afterRestoreScript,
+ dialect.equalsIgnoreCase(DBConstants.DB_DIALECT_SYBASE));
}
/**
@@ -393,7 +396,7 @@
}
/**
- * Return the command to drop primary or foreign key.
+ * Return the SQL script for drop primary or foreign key.
*
* @param isPrimaryKey
* boolean
@@ -871,7 +874,6 @@
String constraintName = validateConstraintName("JCR_FK_" + multiDb + "ITEM_PARENT", dialect);
cleanScripts.add("ALTER TABLE JCR_" + multiDb + "ITEM " + dropCommand(false, constraintName, dialect));
- constraintName = validateConstraintName("JCR_FK_" + multiDb + "ITEM_PARENT", dialect);
String constraint =
"CONSTRAINT " + constraintName + " FOREIGN KEY(PARENT_ID) REFERENCES JCR_" + multiDb + "ITEM(ID)";
commitScripts.add("ALTER TABLE JCR_" + multiDb + "ITEM ADD " + constraint);
@@ -941,4 +943,34 @@
}
}
}
+
+ private static void processingClean(DBCleaner dbCleaner, Connection jdbcConn) throws SQLException
+ {
+ try
+ {
+ dbCleaner.executeCleanScripts();
+
+ try
+ {
+ dbCleaner.executeCommitScripts();
+ }
+ catch (SQLException e)
+ {
+ LOG.error("Can't remove temporary objects", e);
+ }
+
+ jdbcConn.commit();
+ }
+ catch (SQLException e)
+ {
+ jdbcConn.rollback();
+
+ dbCleaner.executeRollbackScripts();
+ jdbcConn.commit();
+ }
+ finally
+ {
+ jdbcConn.close();
+ }
+ }
}
12 years, 5 months
exo-jcr SVN: r5264 - in jcr/trunk/exo.jcr.component.core/src: test/java/org/exoplatform/services/jcr/api/nodetypes and 1 other directory.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-05 11:04:47 -0500 (Mon, 05 Dec 2011)
New Revision: 5264
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/nodetype/NodeTypeImpl.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/nodetypes/TestNodeDefinition.java
Log:
EXOJCR-1670: avoid NPE when try to check can set multi-valued property when property definition not found
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/nodetype/NodeTypeImpl.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/nodetype/NodeTypeImpl.java 2011-12-05 12:47:30 UTC (rev 5263)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/nodetype/NodeTypeImpl.java 2011-12-05 16:04:47 UTC (rev 5264)
@@ -206,37 +206,37 @@
InternalQName pname = locationFactory.parseJCRName(propertyName).getInternalName();
PropertyDefinitionDatas pdefs = nodeTypeDataManager.getPropertyDefinitions(pname, nodeTypeData.getName());
- PropertyDefinitionData pd = pdefs.getDefinition(true);
- if (pd != null)
+ if (pdefs != null)
{
- if (pd.isProtected())
+ PropertyDefinitionData pd = pdefs.getDefinition(true);
+ if (pd != null)
{
- // can set (edit)
- return false;
- }
- else if (values != null)
- {
- // can set (add or edit)
- int res = 0;
- for (Value value : values)
+ if (pd.isProtected())
{
- if (canSetPropertyForType(pd.getRequiredType(), value, pd.getValueConstraints()))
+ // can set (edit)
+ return false;
+ }
+ else if (values != null)
+ {
+ // can set (add or edit)
+ int res = 0;
+ for (Value value : values)
{
- res++;
+ if (canSetPropertyForType(pd.getRequiredType(), value, pd.getValueConstraints()))
+ {
+ res++;
+ }
}
+ return res == values.length;
}
- return res == values.length;
+ else
+ {
+ // can remove
+ return !pd.isMandatory();
+ }
}
- else
- {
- // can remove
- return !pd.isMandatory();
- }
}
- else
- {
- return false;
- }
+ return false;
}
catch (RepositoryException e)
{
Modified: jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/nodetypes/TestNodeDefinition.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/nodetypes/TestNodeDefinition.java 2011-12-05 12:47:30 UTC (rev 5263)
+++ jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/nodetypes/TestNodeDefinition.java 2011-12-05 16:04:47 UTC (rev 5264)
@@ -24,6 +24,7 @@
import javax.jcr.Node;
import javax.jcr.RepositoryException;
+import javax.jcr.Value;
import javax.jcr.nodetype.ConstraintViolationException;
import javax.jcr.version.Version;
@@ -146,4 +147,16 @@
// ok
}
}
+
+ /**
+ * Testing method canSetProperty for class NodeType.
+ * when we set the multivalue property.......
+ * @throws Exception some exception that occurred in the test.
+ */
+ public void testSetNotAllowedMultiValueProperty() throws Exception
+ {
+ Node n = session.getRootNode().addNode("abc", "nt:folder");
+ session.save();
+ assertFalse(n.getPrimaryNodeType().canSetProperty("test", (Value[])null));
+ }
}
12 years, 5 months
exo-jcr SVN: r5263 - in jcr/branches/1.12.x/patch: 1.12.12-GA and 1 other directories.
by do-not-reply@jboss.org
Author: tolusha
Date: 2011-12-05 07:47:30 -0500 (Mon, 05 Dec 2011)
New Revision: 5263
Added:
jcr/branches/1.12.x/patch/1.12.12-GA/
jcr/branches/1.12.x/patch/1.12.12-GA/JCR-1695/
jcr/branches/1.12.x/patch/1.12.12-GA/JCR-1695/JCR-1695.patch
Log:
JCR-1695: patch proposed
Added: jcr/branches/1.12.x/patch/1.12.12-GA/JCR-1695/JCR-1695.patch
===================================================================
--- jcr/branches/1.12.x/patch/1.12.12-GA/JCR-1695/JCR-1695.patch (rev 0)
+++ jcr/branches/1.12.x/patch/1.12.12-GA/JCR-1695/JCR-1695.patch 2011-12-05 12:47:30 UTC (rev 5263)
@@ -0,0 +1,22 @@
+Index: exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/storage/value/fs/TestRemoveFromValueStorage.java
+===================================================================
+--- exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/storage/value/fs/TestRemoveFromValueStorage.java (revision 5258)
++++ exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/storage/value/fs/TestRemoveFromValueStorage.java (working copy)
+@@ -139,7 +139,7 @@
+ }
+ }
+
+- for (int i = 0; i < count; i++)
++ for (int i = 0; i < channels.size(); i++)
+ {
+ try
+ {
+@@ -155,7 +155,7 @@
+ mySession.save();
+
+ // checking whether values are still in value storage.
+- for (int i = 0; i < count; i++)
++ for (int i = 0; i < channels.size(); i++)
+ {
+ try
+ {
12 years, 5 months