[infinispan-commits] Infinispan SVN: r108 - in trunk/core/src: main/java/org/infinispan/commands/remote and 10 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Apr 13 09:58:21 EDT 2009
Author: mircea.markus
Date: 2009-04-13 09:58:20 -0400 (Mon, 13 Apr 2009)
New Revision: 108
Added:
trunk/core/src/main/java/org/infinispan/commands/remote/BaseRpcCommand.java
trunk/core/src/main/java/org/infinispan/commands/remote/CacheRpcCommand.java
trunk/core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java
trunk/core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java
Removed:
trunk/core/src/main/java/org/infinispan/commands/CacheRpcCommand.java
trunk/core/src/main/java/org/infinispan/commands/remote/ReplicateCommand.java
Modified:
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
trunk/core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java
trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java
trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
trunk/core/src/test/java/org/infinispan/api/MixedModeTest.java
trunk/core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java
trunk/core/src/test/java/org/infinispan/invalidation/BaseInvalidationTest.java
trunk/core/src/test/java/org/infinispan/replication/AsyncReplTest.java
trunk/core/src/test/java/org/infinispan/replication/BaseReplicatedAPITest.java
trunk/core/src/test/java/org/infinispan/replication/SyncReplTest.java
trunk/core/src/test/java/org/infinispan/test/ReplListener.java
Log:
[ISPN-40] - optimize and cleanup ReplicateCommand
- also some renamings for consistency: RPC -> Rpc amd METHOD_ID -> COMMAND_ID
Deleted: trunk/core/src/main/java/org/infinispan/commands/CacheRpcCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CacheRpcCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/CacheRpcCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -1,17 +0,0 @@
-package org.infinispan.commands;
-
-/**
- * The RPCManager only replicates commands wrapped in an RPCCommand. As a wrapper, an RPCCommand could contain a single
- * {@link org.infinispan.commands.ReplicableCommand} or a List of them.
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public interface CacheRpcCommand extends ReplicableCommand {
-
- /**
- * @return the name of the cache that produced this command. This will also be the name of the cache this command is
- * intended for.
- */
- String getCacheName();
-}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -24,7 +24,8 @@
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.SizeCommand;
-import org.infinispan.commands.remote.ReplicateCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
@@ -86,9 +87,9 @@
*/
void initializeReplicableCommand(ReplicableCommand command);
- ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
+ MultipleRpcCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
- ReplicateCommand buildReplicateCommand(ReplicableCommand call);
+ SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call);
StateTransferControlCommand buildStateTransferControlCommand(boolean block);
}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -26,7 +26,8 @@
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
-import org.infinispan.commands.remote.ReplicateCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
@@ -123,12 +124,12 @@
return new RollbackCommand(gtx);
}
- public ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate) {
- return new ReplicateCommand(toReplicate, cache.getName());
+ public MultipleRpcCommand buildReplicateCommand(List<ReplicableCommand> toReplicate) {
+ return new MultipleRpcCommand(toReplicate, cache.getName());
}
- public ReplicateCommand buildReplicateCommand(ReplicableCommand call) {
- return new ReplicateCommand(call, cache.getName());
+ public SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
+ return new SingleRpcCommand(cache.getName(), call);
}
public StateTransferControlCommand buildStateTransferControlCommand(boolean block) {
@@ -138,28 +139,35 @@
public void initializeReplicableCommand(ReplicableCommand c) {
if (c == null) return;
switch (c.getCommandId()) {
- case PutKeyValueCommand.METHOD_ID:
+ case PutKeyValueCommand.COMMAND_ID:
((PutKeyValueCommand) c).init(notifier);
break;
- case PutMapCommand.METHOD_ID:
+ case PutMapCommand.COMMAND_ID:
((PutMapCommand) c).init(notifier);
break;
- case RemoveCommand.METHOD_ID:
+ case RemoveCommand.COMMAND_ID:
((RemoveCommand) c).init(notifier);
break;
- case ReplicateCommand.METHOD_ID:
- ReplicateCommand rc = (ReplicateCommand) c;
+ case MultipleRpcCommand.COMMAND_ID:
+ MultipleRpcCommand rc = (MultipleRpcCommand) c;
rc.setInterceptorChain(interceptorChain);
if (rc.getCommands() != null)
for (ReplicableCommand nested : rc.getCommands()) {
initializeReplicableCommand(nested);
}
break;
- case InvalidateCommand.METHOD_ID:
+ case SingleRpcCommand.COMMAND_ID:
+ SingleRpcCommand src = (SingleRpcCommand) c;
+ src.setInterceptorChain(interceptorChain);
+ if (src.getCommand() != null)
+ initializeReplicableCommand(src.getCommand());
+
+ break;
+ case InvalidateCommand.COMMAND_ID:
InvalidateCommand ic = (InvalidateCommand) c;
ic.init(notifier);
break;
- case PrepareCommand.METHOD_ID:
+ case PrepareCommand.COMMAND_ID:
PrepareCommand pc = (PrepareCommand) c;
if (pc.getModifications() != null)
for (ReplicableCommand nested : pc.getModifications()) initializeReplicableCommand(nested);
Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -4,7 +4,8 @@
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
-import org.infinispan.commands.remote.ReplicateCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
@@ -48,13 +49,13 @@
public ReplicableCommand fromStream(byte id, Object[] parameters) {
ReplicableCommand command;
switch (id) {
- case PutKeyValueCommand.METHOD_ID:
+ case PutKeyValueCommand.COMMAND_ID:
command = new PutKeyValueCommand();
break;
- case PutMapCommand.METHOD_ID:
+ case PutMapCommand.COMMAND_ID:
command = new PutMapCommand();
break;
- case RemoveCommand.METHOD_ID:
+ case RemoveCommand.COMMAND_ID:
command = new RemoveCommand();
break;
case ReplaceCommand.METHOD_ID:
@@ -66,7 +67,7 @@
case ClearCommand.METHOD_ID:
command = new ClearCommand();
break;
- case PrepareCommand.METHOD_ID:
+ case PrepareCommand.COMMAND_ID:
command = new PrepareCommand();
break;
case CommitCommand.METHOD_ID:
@@ -75,10 +76,13 @@
case RollbackCommand.METHOD_ID:
command = new RollbackCommand();
break;
- case ReplicateCommand.METHOD_ID:
- command = new ReplicateCommand();
+ case MultipleRpcCommand.COMMAND_ID:
+ command = new MultipleRpcCommand();
break;
- case InvalidateCommand.METHOD_ID:
+ case SingleRpcCommand.COMMAND_ID:
+ command = new SingleRpcCommand();
+ break;
+ case InvalidateCommand.COMMAND_ID:
command = new InvalidateCommand();
break;
case StateTransferControlCommand.METHOD_ID:
Added: trunk/core/src/main/java/org/infinispan/commands/remote/BaseRpcCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/BaseRpcCommand.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/BaseRpcCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -0,0 +1,68 @@
+package org.infinispan.commands.remote;
+
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.logging.Log;
+import org.infinispan.logging.LogFactory;
+
+/**
+ * Base class for RPC commands.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public abstract class BaseRpcCommand implements CacheRpcCommand {
+
+ protected InterceptorChain interceptorChain;
+ protected String cacheName;
+
+ private static final Log log = LogFactory.getLog(BaseRpcCommand.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ protected BaseRpcCommand(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ BaseRpcCommand() {
+ }
+
+ public String getCacheName() {
+ return cacheName;
+ }
+
+ public void setInterceptorChain(InterceptorChain interceptorChain) {
+ this.interceptorChain = interceptorChain;
+ }
+
+
+ protected Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand) throws Throwable {
+ Object result;
+ try {
+ if (trace) log.trace("Invoking command " + cacheCommand + ", with originLocal flag set to false.");
+ ctx.setOriginLocal(false);
+ if (cacheCommand instanceof VisitableCommand) {
+ Object retVal = interceptorChain.invokeRemote((VisitableCommand) cacheCommand);
+ // we only need to return values for a set of remote calls; not every call.
+ result = null;
+ } else {
+ throw new RuntimeException("Do we still need to deal with non-visitable commands? (" + cacheCommand.getClass().getName() + ")");
+// result = cacheCommand.perform(null);
+ }
+ }
+ catch (Throwable ex) {
+ // TODO deal with PFER
+// if (!(cacheCommand instanceof PutForExternalReadCommand))
+// {
+ throw ex;
+// }
+// else
+// {
+// if (trace)
+// log.trace("Caught an exception, but since this is a putForExternalRead() call, suppressing the exception. Exception is:", ex);
+// result = null;
+// }
+ }
+ return result;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/commands/remote/BaseRpcCommand.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: trunk/core/src/main/java/org/infinispan/commands/remote/CacheRpcCommand.java (from rev 106, trunk/core/src/main/java/org/infinispan/commands/CacheRpcCommand.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/CacheRpcCommand.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/CacheRpcCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -0,0 +1,20 @@
+package org.infinispan.commands.remote;
+
+import org.infinispan.commands.ReplicableCommand;
+
+/**
+ * The RPCManager only replicates commands wrapped in an RPCCommand. As a wrapper, an RPCCommand could contain a single
+ * {@link org.infinispan.commands.ReplicableCommand} or a List of them.
+ *
+ * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
+ * @since 4.0
+ */
+public interface CacheRpcCommand extends ReplicableCommand {
+
+ /**
+ * @return the name of the cache that produced this command. This will also be the name of the cache this command is
+ * intended for.
+ */
+ String getCacheName();
+}
Property changes on: trunk/core/src/main/java/org/infinispan/commands/remote/CacheRpcCommand.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -22,7 +22,6 @@
package org.infinispan.commands.remote;
import org.infinispan.CacheException;
-import org.infinispan.commands.CacheRpcCommand;
import org.infinispan.commands.DataCommand;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -40,12 +39,10 @@
* @author Mircea.Markus at jboss.com
* @since 4.0
*/
-public class ClusteredGetCommand implements CacheRpcCommand
-{
+public class ClusteredGetCommand implements CacheRpcCommand {
public static final byte COMMAND_ID = 22;
private static final Log log = LogFactory.getLog(ClusteredGetCommand.class);
- private static final boolean trace = log.isTraceEnabled();
private Object key;
private String cacheName;
Copied: trunk/core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java (from rev 106, trunk/core/src/main/java/org/infinispan/commands/remote/ReplicateCommand.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -0,0 +1,124 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.commands.remote;
+
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.logging.Log;
+import org.infinispan.logging.LogFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Command that implements cluster replication logic.
+ * <p/>
+ * This is not a {@link VisitableCommand} and hence not passed up the {@link org.infinispan.interceptors.base.CommandInterceptor}
+ * chain.
+ * <p/>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.0
+ */
+public class MultipleRpcCommand extends BaseRpcCommand {
+
+ public static final byte COMMAND_ID = 13;
+
+ private static final Log log = LogFactory.getLog(MultipleRpcCommand.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ private ReplicableCommand[] commands;
+
+ public MultipleRpcCommand(List<ReplicableCommand> modifications, String cacheName) {
+ super(cacheName);
+ commands = modifications.toArray(new ReplicableCommand[modifications.size()]);
+ }
+
+ public MultipleRpcCommand() {
+ }
+
+ /**
+ * Executes commands replicated to the current cache instance by other cache instances.
+ */
+ public Object perform(InvocationContext ctx) throws Throwable {
+ if (trace) log.trace("Executing remotely originated commands: " + commands.length);
+ for (ReplicableCommand command : commands) processCommand(ctx, command);
+ return null;
+ }
+
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ public ReplicableCommand[] getCommands() {
+ return commands;
+ }
+
+ public Object[] getParameters() {
+ int numCommands = commands.length;
+ Object[] retval = new Object[numCommands + 1];
+ retval[0] = cacheName;
+ System.arraycopy(commands, 0, retval, 1, numCommands);
+ return retval;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void setParameters(int commandId, Object[] args) {
+ cacheName = (String) args[0];
+ int numCommands = args.length - 1;
+ commands = new ReplicableCommand[numCommands];
+ System.arraycopy(args, 1, commands, 0, numCommands);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MultipleRpcCommand)) return false;
+
+ MultipleRpcCommand that = (MultipleRpcCommand) o;
+
+ if (cacheName != null ? !cacheName.equals(that.cacheName) : that.cacheName != null) return false;
+ if (!Arrays.equals(commands, that.commands)) return false;
+ if (interceptorChain != null ? !interceptorChain.equals(that.interceptorChain) : that.interceptorChain != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = interceptorChain != null ? interceptorChain.hashCode() : 0;
+ result = 31 * result + (commands != null ? Arrays.hashCode(commands) : 0);
+ result = 31 * result + (cacheName != null ? cacheName.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "MultipleRpcCommand{" +
+ "interceptorChain=" + interceptorChain +
+ ", commands=" + (commands == null ? null : Arrays.asList(commands)) +
+ ", cacheName='" + cacheName + '\'' +
+ '}';
+ }
+}
\ No newline at end of file
Property changes on: trunk/core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: trunk/core/src/main/java/org/infinispan/commands/remote/ReplicateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/ReplicateCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/ReplicateCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -1,221 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.commands.remote;
-
-import org.infinispan.commands.CacheRpcCommand;
-import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.VisitableCommand;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.interceptors.InterceptorChain;
-import org.infinispan.logging.Log;
-import org.infinispan.logging.LogFactory;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Command that implements cluster replication logic.
- * <p/>
- * This is not a {@link VisitableCommand} and hence not passed up the {@link org.infinispan.interceptors.base.CommandInterceptor}
- * chain.
- * <p/>
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.0
- */
-public class ReplicateCommand implements CacheRpcCommand
-{
- public static final byte METHOD_ID = 13;
-
- private InterceptorChain interceptorChain;
-
- private static final Log log = LogFactory.getLog(ReplicateCommand.class);
- private static final boolean trace = log.isTraceEnabled();
-
- private ReplicableCommand[] commands;
- private String cacheName;
-
- public ReplicateCommand(List<ReplicableCommand> modifications, String cacheName) {
- if (modifications != null && modifications.size() == 1) {
- this.commands = new ReplicableCommand[]{modifications.get(0)};
- } else {
- this.commands = new ReplicableCommand[modifications.size()];
- int i = 0;
- for (ReplicableCommand rc : modifications) commands[i++] = rc;
- }
- this.cacheName = cacheName;
- }
-
- public ReplicateCommand(ReplicableCommand command, String cacheName) {
- commands = new ReplicableCommand[]{command};
- this.cacheName = cacheName;
- }
-
- public ReplicateCommand() {
- }
-
- public void setInterceptorChain(InterceptorChain interceptorChain) {
- this.interceptorChain = interceptorChain;
- }
-
- /**
- * Executes commands replicated to the current cache instance by other cache instances.
- *
- * @param ctx invocation context, ignored.
- * @return null
- * @throws Throwable
- */
- public Object perform(InvocationContext ctx) throws Throwable {
- if (isSingleCommand()) {
- return processCommand(ctx, commands[0]);
- } else {
- for (ReplicableCommand command : commands) processCommand(ctx, command);
- return null;
- }
- }
-
- private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand) throws Throwable {
- Object result;
- try {
- if (trace) log.trace("Invoking command " + cacheCommand + ", with originLocal flag set to false.");
- ctx.setOriginLocal(false);
- if (cacheCommand instanceof VisitableCommand) {
- Object retVal = interceptorChain.invokeRemote((VisitableCommand) cacheCommand);
- // we only need to return values for a set of remote calls; not every call.
- if (returnValueForRemoteCall(cacheCommand)) {
- result = retVal;
- } else {
- result = null;
- }
- } else {
- throw new RuntimeException("Do we still need to deal with non-visitable commands? (" + cacheCommand.getClass().getName() + ")");
-// result = cacheCommand.perform(null);
- }
- }
- catch (Throwable ex) {
- // TODO deal with PFER
-// if (!(cacheCommand instanceof PutForExternalReadCommand))
-// {
- throw ex;
-// }
-// else
-// {
-// if (trace)
-// log.trace("Caught an exception, but since this is a putForExternalRead() call, suppressing the exception. Exception is:", ex);
-// result = null;
-// }
- }
- return result;
- }
-
- private boolean returnValueForRemoteCall(ReplicableCommand cacheCommand) {
- return cacheCommand instanceof ClusteredGetCommand;
- }
-
- public byte getCommandId() {
- return METHOD_ID;
- }
-
- public ReplicableCommand[] getCommands() {
- return commands;
- }
-
- public String getCacheName() {
- return cacheName;
- }
-
- public void setCacheName(String name) {
- this.cacheName = name;
- }
-
- public final ReplicableCommand getSingleCommand() {
- return commands == null ? null : commands[0];
- }
-
- public Object[] getParameters() {
- int numCommands = commands == null ? 0 : commands.length;
- Object[] retval = new Object[numCommands + 2];
- retval[0] = cacheName;
- retval[1] = numCommands;
- if (numCommands > 0) System.arraycopy(commands, 0, retval, 2, numCommands);
- return retval;
- }
-
- @SuppressWarnings("unchecked")
- public void setParameters(int commandId, Object[] args) {
- cacheName = (String) args[0];
- int numCommands = (Integer) args[1];
- commands = new ReplicableCommand[numCommands];
- System.arraycopy(args, 2, commands, 0, numCommands);
- }
-
- public final boolean isSingleCommand() {
- return commands != null && commands.length == 1;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ReplicateCommand that = (ReplicateCommand) o;
-
- return !(commands != null ? !Arrays.equals(commands, that.commands) : that.commands != null);
- }
-
- @Override
- public int hashCode() {
- return commands != null ? commands.hashCode() : 0;
- }
-
- /**
- * Creates a copy of this command, amking a deep copy of any collections but everything else copied shallow.
- *
- * @return a copy
- */
- public ReplicateCommand copy() {
- ReplicateCommand clone;
- clone = new ReplicateCommand();
- clone.interceptorChain = interceptorChain;
- if (commands != null) clone.commands = commands.clone();
- return clone;
- }
-
- public boolean containsCommandType(Class<? extends ReplicableCommand> aClass) {
- if (commands.length == 1) {
- return commands[0].getClass().equals(aClass);
- } else {
- for (ReplicableCommand command : getCommands()) {
- if (command.getClass().equals(aClass)) return true;
- }
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "ReplicateCommand{" +
- "commands=" + (commands == null ? "null" : Arrays.toString(commands)) +
- ", cacheName='" + cacheName + '\'' +
- '}';
- }
-}
\ No newline at end of file
Added: trunk/core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -0,0 +1,82 @@
+package org.infinispan.commands.remote;
+
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.logging.Log;
+import org.infinispan.logging.LogFactory;
+
+/**
+ * Similar to {@link org.infinispan.commands.remote.MultipleRpcCommand}, but it only aggregates a single command for
+ * replication.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class SingleRpcCommand extends BaseRpcCommand {
+ public static final int COMMAND_ID = 21;
+ private static Log log = LogFactory.getLog(SingleRpcCommand.class);
+ private static boolean trace = log.isTraceEnabled();
+
+ private ReplicableCommand command;
+
+ public SingleRpcCommand(String cacheName, ReplicableCommand command) {
+ super(cacheName);
+ this.command = command;
+ }
+
+ public SingleRpcCommand() {
+ }
+
+ public void setParameters(int commandId, Object[] parameters) {
+ if (commandId != COMMAND_ID) throw new IllegalStateException("Unusupported command id:" + commandId);
+ cacheName = (String) parameters[0];
+ command = (ReplicableCommand) parameters[1];
+ }
+
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ public Object[] getParameters() {
+ return new Object[]{cacheName, command};
+ }
+
+ public Object perform(InvocationContext ctx) throws Throwable {
+ return processCommand(ctx, command);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SingleRpcCommand)) return false;
+
+ SingleRpcCommand that = (SingleRpcCommand) o;
+
+ if (cacheName != null ? !cacheName.equals(that.cacheName) : that.cacheName != null) return false;
+ if (command != null ? !command.equals(that.command) : that.command != null) return false;
+ if (interceptorChain != null ? !interceptorChain.equals(that.interceptorChain) : that.interceptorChain != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = interceptorChain != null ? interceptorChain.hashCode() : 0;
+ result = 31 * result + (cacheName != null ? cacheName.hashCode() : 0);
+ result = 31 * result + (command != null ? command.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "SingleRpcCommand{" +
+ "interceptorChain=" + interceptorChain +
+ ", cacheName='" + cacheName + '\'' +
+ ", command=" + command +
+ '}';
+ }
+
+ public ReplicableCommand getCommand() {
+ return command;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -41,7 +41,7 @@
* @since 4.0
*/
public class PrepareCommand extends AbstractTransactionBoundaryCommand {
- public static final byte METHOD_ID = 10;
+ public static final byte COMMAND_ID = 10;
protected WriteCommand[] modifications;
protected Address localAddress;
@@ -106,7 +106,7 @@
}
public byte getCommandId() {
- return METHOD_ID;
+ return COMMAND_ID;
}
@Override
Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -37,7 +37,7 @@
* @since 4.0
*/
public class InvalidateCommand extends RemoveCommand {
- public static final int METHOD_ID = 47;
+ public static final int COMMAND_ID = 47;
private static final Log log = LogFactory.getLog(InvalidateCommand.class);
private static final boolean trace = log.isTraceEnabled();
private Object[] keys;
@@ -71,7 +71,7 @@
}
public byte getCommandId() {
- return METHOD_ID;
+ return COMMAND_ID;
}
@Override
Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -36,7 +36,7 @@
* @since 4.0
*/
public class PutKeyValueCommand extends AbstractDataCommand implements DataWriteCommand {
- public static final byte METHOD_ID = 3;
+ public static final byte COMMAND_ID = 3;
Object value;
boolean putIfAbsent;
@@ -103,7 +103,7 @@
}
public byte getCommandId() {
- return METHOD_ID;
+ return COMMAND_ID;
}
public Object[] getParameters() {
@@ -111,7 +111,7 @@
}
public void setParameters(int commandId, Object[] parameters) {
- if (commandId != METHOD_ID) throw new IllegalStateException("Invalid method id");
+ if (commandId != COMMAND_ID) throw new IllegalStateException("Invalid method id");
key = parameters[0];
value = parameters[1];
lifespanMillis = (Long) parameters[2];
Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -34,7 +34,7 @@
* @since 4.0
*/
public class PutMapCommand implements WriteCommand {
- public static final byte METHOD_ID = 121;
+ public static final byte COMMAND_ID = 121;
Map<Object, Object> map;
CacheNotifier notifier;
@@ -85,7 +85,7 @@
}
public byte getCommandId() {
- return METHOD_ID;
+ return COMMAND_ID;
}
public Object[] getParameters() {
Modified: trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -38,7 +38,7 @@
public class RemoveCommand extends AbstractDataCommand implements DataWriteCommand {
private static final Log log = LogFactory.getLog(RemoveCommand.class);
private static final boolean trace = log.isTraceEnabled();
- public static final byte METHOD_ID = 6;
+ public static final byte COMMAND_ID = 6;
protected CacheNotifier notifier;
boolean successful = true;
@@ -93,7 +93,7 @@
}
public byte getCommandId() {
- return METHOD_ID;
+ return COMMAND_ID;
}
public boolean equals(Object o) {
Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -21,18 +21,18 @@
*/
package org.infinispan.interceptors.base;
-import org.infinispan.commands.CacheRpcCommand;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.remote.ReplicateCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
+import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.TransactionContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.invocation.Flag;
-import org.infinispan.remoting.RpcManager;
import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.remoting.ResponseMode;
+import org.infinispan.remoting.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.GlobalTransaction;
import org.infinispan.transaction.TransactionTable;
@@ -134,7 +134,7 @@
if (trace)
log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
}
- ReplicateCommand command = commandsFactory.buildReplicateCommand(call);
+ SingleRpcCommand command = commandsFactory.buildSingleRpcCommand(call);
List rsps = rpcManager.invokeRemotely(callRecipients,
command,
Modified: trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -1,6 +1,6 @@
package org.infinispan.remoting;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.statetransfer.StateTransferException;
Modified: trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -1,6 +1,6 @@
package org.infinispan.remoting;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.config.Configuration;
import org.infinispan.factories.ComponentRegistry;
Modified: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -23,7 +23,7 @@
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.remote.ReplicateCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
import org.infinispan.config.Configuration;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
@@ -142,9 +142,9 @@
if (toReplicateSize > 0) {
try {
log.trace("Flushing {0} elements", toReplicateSize);
- ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
+ MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
// send to all live caches in the cluster
- rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
+ rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
}
catch (Throwable t) {
log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -22,7 +22,7 @@
package org.infinispan.remoting.transport.jgroups;
import org.infinispan.CacheException;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.logging.Log;
Modified: trunk/core/src/test/java/org/infinispan/api/MixedModeTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/api/MixedModeTest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/api/MixedModeTest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -58,8 +58,8 @@
invalAsyncCache1.put("k", "invalAsync");
localCache1.put("k", "local");
- replListener(replAsyncCache2).waitForRPC();
- replListener(invalAsyncCache2).waitForRPC();
+ replListener(replAsyncCache2).waitForRpc();
+ replListener(invalAsyncCache2).waitForRpc();
assert replSyncCache1.get("k").equals("replSync");
assert replSyncCache2.get("k").equals("replSync");
Modified: trunk/core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -3,7 +3,7 @@
import org.easymock.EasyMock;
import static org.easymock.EasyMock.*;
import org.infinispan.Cache;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.config.Configuration;
@@ -51,7 +51,7 @@
public void testNoOpWhenKeyPresent() {
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assertEquals("PFER should have succeeded", value, cache1.get(key));
@@ -60,14 +60,14 @@
// reset
replListener2.expect(RemoveCommand.class);
cache1.remove(key);
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assert cache1.isEmpty() : "Should have reset";
assert cache2.isEmpty() : "Should have reset";
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key, value);
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
// now this pfer should be a no-op
cache1.putForExternalRead(key, value2);
@@ -120,7 +120,7 @@
public void testTxSuspension() throws Exception {
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key + "0", value);
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
// start a tx and do some stuff.
replListener2.expect(PutKeyValueCommand.class);
@@ -129,7 +129,7 @@
cache1.putForExternalRead(key, value); // should have happened in a separate tx and have committed already.
Transaction t = tm1.suspend();
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assertEquals("PFER should have completed", value, cache1.get(key));
assertEquals("PFER should have completed", value, cache2.get(key));
@@ -194,7 +194,7 @@
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assertEquals("PFER updated cache1", value, cache1.get(key));
assertEquals("PFER propagated to cache2 as expected", value, cache2.get(key));
@@ -237,7 +237,7 @@
tm1.begin();
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
TransactionTable tt1 = getTransactionTable(cache1);
TransactionTable tt2 = getTransactionTable(cache2);
@@ -253,7 +253,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -265,7 +265,7 @@
cache1.put(key, value);
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -278,7 +278,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForRPC();
+ replListener2.waitForRpc();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
Modified: trunk/core/src/test/java/org/infinispan/invalidation/BaseInvalidationTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/BaseInvalidationTest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/invalidation/BaseInvalidationTest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -3,7 +3,7 @@
import static org.easymock.EasyMock.*;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.config.Configuration;
@@ -48,7 +48,7 @@
replListener(cache2).expectAny();
assertEquals("value", cache1.remove("key"));
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals(false, cache2.containsKey("key"));
}
@@ -56,20 +56,20 @@
public void testResurrectEntry() throws Exception {
replListener(cache2).expect(InvalidateCommand.class);
cache1.put("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals("value", cache1.get("key"));
assertEquals(null, cache2.get("key"));
replListener(cache2).expect(InvalidateCommand.class);
cache1.put("key", "newValue");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals("newValue", cache1.get("key"));
assertEquals(null, cache2.get("key"));
replListener(cache2).expect(InvalidateCommand.class);
assertEquals("newValue", cache1.remove("key"));
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals(null, cache1.get("key"));
assertEquals(null, cache2.get("key"));
@@ -77,14 +77,14 @@
// Restore locally
replListener(cache2).expect(InvalidateCommand.class);
cache1.put("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals("value", cache1.get("key"));
assertEquals(null, cache2.get("key"));
replListener(cache1).expect(InvalidateCommand.class);
cache2.put("key", "value2");
- replListener(cache1).waitForRPC();
+ replListener(cache1).waitForRpc();
assertEquals("value2", cache2.get("key"));
assertEquals(null, cache1.get("key"));
@@ -96,7 +96,7 @@
replListener(cache2).expect(InvalidateCommand.class);
cache1.put("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals("value", cache1.get("key"));
assertNull("Should be null", cache2.get("key"));
@@ -108,7 +108,7 @@
// Remove an entry that doesn't exist in cache2
cache2.remove("key");
tm.commit();
- replListener(cache1).waitForRPC();
+ replListener(cache1).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -117,7 +117,7 @@
public void testTxSyncUnableToInvalidate() throws Exception {
replListener(cache2).expect(InvalidateCommand.class);
cache1.put("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assertEquals("value", cache1.get("key"));
assertNull(cache2.get("key"));
@@ -142,7 +142,7 @@
fail("Ought to have failed!");
} else {
assert true : "Ought to have succeeded";
-// replListener(cache2).waitForRPC();
+// replListener(cache2).waitForRpc();
}
}
catch (RollbackException roll) {
@@ -155,8 +155,8 @@
mgr2.resume(tx2);
try {
mgr2.commit();
- replListener(cache1).waitForRPC();
- if (!isSync) replListener(cache2).waitForRPC();
+ replListener(cache1).waitForRpc();
+ if (!isSync) replListener(cache2).waitForRpc();
assertTrue("Ought to have succeeded!", true);
}
catch (RollbackException roll) {
@@ -198,7 +198,7 @@
replListener(cache2).expect(InvalidateCommand.class);
cache1.putIfAbsent("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value");
assert cache2.get("key") == null;
@@ -227,7 +227,7 @@
replListener(cache2).expect(InvalidateCommand.class);
cache1.remove("key", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -241,7 +241,7 @@
replListener(cache2).expect(ClearCommand.class);
cache1.clear();
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -261,7 +261,7 @@
replListener(cache2).expect(InvalidateCommand.class);
cache1.replace("key", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value1");
assert cache2.get("key") == null;
@@ -286,7 +286,7 @@
replListener(cache2).expect(InvalidateCommand.class);
assert cache1.replace("key", "valueN", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value1");
assert cache2.get("key") == null;
Modified: trunk/core/src/test/java/org/infinispan/replication/AsyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncReplTest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncReplTest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -40,7 +40,7 @@
replListener(cache2).expectAny();
cache1.put(key, "value1");
// allow for replication
- replListener(cache2).waitForRPC(60, TimeUnit.SECONDS);
+ replListener(cache2).waitForRpc(60, TimeUnit.SECONDS);
assertEquals("value1", cache1.get(key));
assertEquals("value1", cache2.get(key));
@@ -48,7 +48,7 @@
cache1.put(key, "value2");
assertEquals("value2", cache1.get(key));
- replListener(cache2).waitForRPC(60, TimeUnit.SECONDS);
+ replListener(cache2).waitForRpc(60, TimeUnit.SECONDS);
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
@@ -59,7 +59,7 @@
replListener(cache2).expectAny();
cache1.put(key, "value1");
// allow for replication
- replListener(cache2).waitForRPC(60, TimeUnit.SECONDS);
+ replListener(cache2).waitForRpc(60, TimeUnit.SECONDS);
assertEquals("value1", cache1.get(key));
assertEquals("value1", cache2.get(key));
@@ -73,7 +73,7 @@
mgr.commit();
- replListener(cache2).waitForRPC(60, TimeUnit.SECONDS);
+ replListener(cache2).waitForRpc(60, TimeUnit.SECONDS);
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
Modified: trunk/core/src/test/java/org/infinispan/replication/BaseReplicatedAPITest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/BaseReplicatedAPITest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/replication/BaseReplicatedAPITest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -37,7 +37,7 @@
replListener(cache2).expect(PutKeyValueCommand.class);
cache1.put("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value");
assert cache2.get("key").equals("value");
@@ -48,7 +48,7 @@
replListener(cache2).expect(PutMapCommand.class);
cache1.putAll(map);
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value");
assert cache2.get("key").equals("value");
@@ -65,7 +65,7 @@
replListener(cache2).expect(RemoveCommand.class);
cache1.remove("key");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -77,7 +77,7 @@
replListener(cache2).expect(RemoveCommand.class);
cache1.remove("key");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -90,7 +90,7 @@
replListener(cache2).expect(PutKeyValueCommand.class);
cache1.putIfAbsent("key", "value");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value");
assert cache2.get("key").equals("value");
@@ -119,7 +119,7 @@
replListener(cache2).expect(RemoveCommand.class);
cache1.remove("key", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -133,7 +133,7 @@
replListener(cache2).expect(ClearCommand.class);
cache1.clear();
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -153,7 +153,7 @@
replListener(cache2).expect(ReplaceCommand.class);
cache1.replace("key", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value1");
assert cache2.get("key").equals("value1");
@@ -178,7 +178,7 @@
replListener(cache2).expect(ReplaceCommand.class);
cache1.replace("key", "valueN", "value1");
- replListener(cache2).waitForRPC();
+ replListener(cache2).waitForRpc();
assert cache1.get("key").equals("value1");
assert cache2.get("key").equals("value1");
Modified: trunk/core/src/test/java/org/infinispan/replication/SyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/SyncReplTest.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/replication/SyncReplTest.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -8,7 +8,7 @@
import static org.easymock.EasyMock.*;
import org.infinispan.Cache;
-import org.infinispan.commands.CacheRpcCommand;
+import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.config.Configuration;
import org.infinispan.remoting.RpcManager;
import org.infinispan.remoting.RpcManagerImpl;
Modified: trunk/core/src/test/java/org/infinispan/test/ReplListener.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/ReplListener.java 2009-04-13 12:23:10 UTC (rev 107)
+++ trunk/core/src/test/java/org/infinispan/test/ReplListener.java 2009-04-13 13:58:20 UTC (rev 108)
@@ -29,7 +29,7 @@
}
/**
- * Expects any commands. The moment a single command is detected, the {@link #waitForRPC()} command will be
+ * Expects any commands. The moment a single command is detected, the {@link #waitForRpc()} command will be
* unblocked.
*/
public void expectAny() {
@@ -61,7 +61,7 @@
}
/**
- * Expects a specific set of commands. {@link #waitForRPC()} will block until all of these commands are detected.
+ * Expects a specific set of commands. {@link #waitForRpc()} will block until all of these commands are detected.
*
* @param expectedCommands commands to expect
*/
@@ -76,14 +76,14 @@
* Blocks for a predefined amount of time (120 Seconds) until commands defined in any of the expect*() methods have
* been detected. If the commands have not been detected by this time, an exception is thrown.
*/
- public void waitForRPC() {
- waitForRPC(30, TimeUnit.SECONDS);
+ public void waitForRpc() {
+ waitForRpc(30, TimeUnit.SECONDS);
}
/**
- * The same as {@link #waitForRPC()} except that you are allowed to specify the max wait time.
+ * The same as {@link #waitForRpc()} except that you are allowed to specify the max wait time.
*/
- public void waitForRPC(long time, TimeUnit unit) {
+ public void waitForRpc(long time, TimeUnit unit) {
assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
try {
if (!latch.await(time, unit)) {
More information about the infinispan-commits
mailing list