Author: areshetnyak
Date: 2011-01-24 10:15:20 -0500 (Mon, 24 Jan 2011)
New Revision: 3836
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
Log:
EXOJCR-1107 : The changes who fixed problem with method
RPCService.executeCommandOnCoordinator() was committed.
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-01-24
13:41:35 UTC (rev 3835)
+++
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/rpc/impl/RPCServiceImpl.java 2011-01-24
15:15:20 UTC (rev 3836)
@@ -426,7 +426,7 @@
throw new RPCException("Command " + commandId + " unknown, please
register your command first");
}
final Message msg = new Message();
- msg.setObject(new MessageBody(commandId, args));
+ msg.setObject(new MessageBody(dests.size() == 1 && dests != members ?
dests.get(0) : null, commandId, args));
RspList rsps = AccessController.doPrivileged(new PrivilegedAction<RspList>()
{
public RspList run()
@@ -486,7 +486,15 @@
// Ensure that the service is fully started before trying to execute any
command
startSignal.await();
MessageBody body = (MessageBody)msg.getObject();
- RemoteCommand command = getCommand(commandId = body.getCommandId());
+ commandId = body.getCommandId();
+ if (!body.accept(channel.getLocalAddress()))
+ {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Command : " + commandId + " needs to be executed
on the coordinator " +
+ "only and the local node is not the coordinator, the command
will be ignored");
+ return null;
+ }
+ RemoteCommand command = getCommand(commandId);
if (command == null)
{
return new RPCException("Command " + commandId + " unkown,
please register your command first");
@@ -888,18 +896,29 @@
private String commandId;
/**
- * The list of parameters;
+ * The list of parameters
*/
private Serializable[] args;
+
+ /**
+ * The hash code of the expected destination
+ */
+ private int destination;
public MessageBody()
{
}
- public MessageBody(String commandId, Serializable[] args)
+ /**
+ * @param dest The destination of the message
+ * @param commandId the id of the command to execute
+ * @param args the arguments to use
+ */
+ public MessageBody(Address dest, String commandId, Serializable[] args)
{
this.commandId = commandId;
this.args = args;
+ this.destination = dest == null ? 0 : dest.hashCode();
}
public String getCommandId()
@@ -910,6 +929,17 @@
public Serializable[] getArgs()
{
return args;
+ }
+
+ /**
+ * Indicates whether or not the given message body accepts the given address
+ * @param address the address to check
+ * @return <code>true</code> if the message is for everybody or if the
given address is the expected address,
+ * <code>false</code> otherwise
+ */
+ public boolean accept(Address address)
+ {
+ return destination == 0 || destination == address.hashCode();
}
/**
@@ -917,6 +947,11 @@
*/
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException
{
+ boolean unicast = in.readBoolean();
+ if (unicast)
+ {
+ this.destination = in.readInt();
+ }
this.commandId = in.readUTF();
int size = in.readInt();
if (size == -1)
@@ -938,6 +973,12 @@
*/
public void writeExternal(ObjectOutput out) throws IOException
{
+ boolean unicast = destination != 0;
+ out.writeBoolean(unicast);
+ if (unicast)
+ {
+ out.writeInt(destination);
+ }
out.writeUTF(commandId);
if (args == null)
{
Modified:
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-01-24
13:41:35 UTC (rev 3835)
+++
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/rpc/impl/TestRPCServiceImpl.java 2011-01-24
15:15:20 UTC (rev 3836)
@@ -33,6 +33,7 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
@@ -824,6 +825,7 @@
catch (Throwable e)
{
error.set(e);
+ e.printStackTrace();
}
finally
{
@@ -1182,6 +1184,74 @@
}
}
+ public void testExecOnCoordinator() throws Exception
+ {
+ InitParams params = new InitParams();
+ ValueParam paramConf = new ValueParam();
+ paramConf.setName(RPCServiceImpl.PARAM_JGROUPS_CONFIG);
+ paramConf.setValue("jar:/conf/portal/udp.xml");
+ params.addParameter(paramConf);
+
+ final List<Boolean> calledCommands = Collections.synchronizedList(new
ArrayList<Boolean>());
+
+ RPCServiceImpl service1 = null;
+ RPCServiceImpl service2 = null;
+ try
+ {
+ service1 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service1Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 1";
+ }
+ };
+ service1.registerCommand(service1Cmd);
+
+ service2 = new RPCServiceImpl(container.getContext(), params, configManager);
+ RemoteCommand service2Cmd = new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "CoordinatorExecutedCommand";
+ }
+
+ public String execute(Serializable[] args) throws Throwable
+ {
+ calledCommands.add(Boolean.TRUE);
+ return "service 2";
+ }
+ };
+ service2.registerCommand(service2Cmd);
+ // starting services
+ service1.start();
+ service2.start();
+
+ Object o = service1.executeCommandOnCoordinator(service1Cmd, true);
+ assertEquals("service 1", o);
+
+ // it should be executed once
+ assertEquals(1, calledCommands.size());
+ }
+ finally
+ {
+ if (service1 != null)
+ {
+ service1.stop();
+ }
+ if (service2 != null)
+ {
+ service2.stop();
+ }
+ }
+ }
+
private static class MyListener implements TopologyChangeListener
{