[jbosscache-commits] JBoss Cache SVN: r7605 - in core/branches/flat/src: main/java/org/horizon/commands and 15 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Jan 28 09:34:08 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-01-28 09:34:08 -0500 (Wed, 28 Jan 2009)
New Revision: 7605
Added:
core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java
core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java
core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
core/branches/flat/src/test/java/org/horizon/BasicTest.java
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
Log:
replication code + tests on shared transport
Modified: core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -22,6 +22,7 @@
package org.horizon.cluster;
import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.commands.remote.ReplicateCommand;
import org.horizon.config.Configuration;
@@ -60,7 +61,7 @@
/**
* Holds the replication jobs: LinkedList<MethodCall>
*/
- final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
+ final List<RPCCommand> elements = new LinkedList<RPCCommand>();
/**
* For periodical replication
@@ -128,7 +129,7 @@
/**
* Adds a new method call.
*/
- public void add(ReplicateCommand job) {
+ public void add(RPCCommand job) {
if (job == null)
throw new NullPointerException("job is null");
synchronized (elements) {
Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
*/
package org.horizon.commands;
+import org.horizon.CacheSPI;
import org.horizon.commands.read.GetKeyValueCommand;
import org.horizon.commands.read.SizeCommand;
import org.horizon.commands.remote.ReplicateCommand;
@@ -36,7 +37,6 @@
import org.horizon.commands.write.ReplaceCommand;
import org.horizon.container.DataContainer;
import org.horizon.factories.annotations.Inject;
-import org.horizon.interceptors.InterceptorChain;
import org.horizon.notifications.CacheNotifier;
import org.horizon.remoting.transport.Address;
import org.horizon.transaction.GlobalTransaction;
@@ -51,13 +51,16 @@
public class CommandsFactoryImpl implements CommandsFactory {
private DataContainer dataContainer;
private CacheNotifier notifier;
- private InterceptorChain interceptorChain;
+ private CacheSPI cache;
+ // some stateless commands can be reused so that they aren't constructed again all the time.
+ SizeCommand cachedSizeCommand;
+
@Inject
- public void setupDependencies(DataContainer container, CacheNotifier notifier, InterceptorChain interceptorChain) {
+ public void setupDependencies(DataContainer container, CacheNotifier notifier, CacheSPI cache) {
this.dataContainer = container;
this.notifier = notifier;
- this.interceptorChain = interceptorChain;
+ this.cache = cache;
}
public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value) {
@@ -73,7 +76,10 @@
}
public SizeCommand buildSizeCommand() {
- return new SizeCommand(dataContainer);
+ if (cachedSizeCommand == null) {
+ cachedSizeCommand = new SizeCommand(dataContainer);
+ }
+ return cachedSizeCommand;
}
public GetKeyValueCommand buildGetKeyValueCommand(Object key) {
@@ -107,11 +113,11 @@
}
public ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate) {
- return new ReplicateCommand(toReplicate);
+ return new ReplicateCommand(toReplicate, cache.getName());
}
public ReplicateCommand buildReplicateCommand(ReplicableCommand call) {
- return new ReplicateCommand(call);
+ return new ReplicateCommand(call, cache.getName());
}
public void initializeReplicableCommand(ReplicableCommand c) {
@@ -126,13 +132,10 @@
case RemoveCommand.METHOD_ID:
((RemoveCommand) c).init(notifier);
break;
- case ReplicateCommand.MULTIPLE_METHOD_ID:
- case ReplicateCommand.SINGLE_METHOD_ID:
+ case ReplicateCommand.METHOD_ID:
ReplicateCommand rc = (ReplicateCommand) c;
- if (rc.getModifications() != null)
- for (ReplicableCommand nested : rc.getModifications()) initializeReplicableCommand(nested);
- initializeReplicableCommand(rc.getSingleModification());
- rc.initialize(interceptorChain);
+ if (rc.getCommands() != null)
+ for (ReplicableCommand nested : rc.getCommands()) initializeReplicableCommand(nested);
break;
case PrepareCommand.METHOD_ID:
PrepareCommand pc = (PrepareCommand) c;
Added: core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -0,0 +1,67 @@
+package org.horizon.commands;
+
+import org.horizon.interceptors.InterceptorChain;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * The RPCManager only replicates commands wrapped in an RPCCommand. As a wrapper, an RPCCommand could contain a single
+ * {@link org.horizon.commands.ReplicableCommand} or a List of them.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface RPCCommand extends ReplicableCommand {
+
+ /**
+ * @return true if this only wraps a single ReplicableCommand. False if it wraps more than one.
+ */
+ boolean isSingleCommand();
+
+ /**
+ * A convenience method if there is only a single command being transported, i.e., if {@link #isSingleCommand()} is
+ * true. If {@link #isSingleCommand()} is false, this method throws a {@link IllegalStateException} so it should
+ * only be used after testing {@link #isSingleCommand()}.
+ *
+ * @return a single ReplicableCommand.
+ */
+ ReplicableCommand getSingleCommand();
+
+ /**
+ * A more generic mechanism to get a hold of the commands wrapped. Even if {@link #isSingleCommand()} is true, this
+ * command returns a valid and usable List.
+ *
+ * @return a list of all commands.
+ */
+ List<ReplicableCommand> getCommands();
+
+ /**
+ * Adds a single command to the list of commands being wrapped
+ *
+ * @param command command to add
+ */
+ void addCommand(ReplicableCommand command);
+
+ /**
+ * Adds a collection of commands to the list of commands being wrapped
+ *
+ * @param commands commands to add
+ */
+ void addCommands(Collection<? extends ReplicableCommand> commands);
+
+ /**
+ * @return the name of the cache that produced this command. This will also be the name of the cache this command is
+ * intended for.
+ */
+ String getCacheName();
+
+ void setCacheName(String name);
+
+ /**
+ * Sets the interceptor chain on which to invoke the command.
+ *
+ * @param interceptorChain chain to invoke command on
+ */
+ void setInterceptorChain(InterceptorChain interceptorChain);
+}
Modified: core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -61,8 +61,7 @@
case RollbackCommand.METHOD_ID:
command = new RollbackCommand();
break;
- case ReplicateCommand.MULTIPLE_METHOD_ID:
- case ReplicateCommand.SINGLE_METHOD_ID:
+ case ReplicateCommand.METHOD_ID:
command = new ReplicateCommand();
break;
Modified: core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -25,7 +25,7 @@
/**
* The core of the command-based cache framework. Commands correspond to specific areas of functionality in the cache,
- * and can be replicated using the {@link org.horizon.marshall.Marshaller} framework.
+ * and can be replicated using the {@link org.horizon.remoting.RPCManager}
*
* @author Mircea.Markus at jboss.com
* @author Manik Surtani
Modified: core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -27,7 +27,7 @@
import org.horizon.context.InvocationContext;
/**
- * // TODO: MANIK: Document this
+ * Command to calculate the size of the cache
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @since 1.0
@@ -58,4 +58,11 @@
public void setParameters(int commandId, Object[] parameters) {
// no-op
}
+
+ @Override
+ public String toString() {
+ return "SizeCommand{" +
+ "containerSize=" + container.size() +
+ '}';
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
*/
package org.horizon.commands.remote;
+import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.commands.VisitableCommand;
import org.horizon.context.InvocationContext;
@@ -29,11 +30,12 @@
import org.horizon.logging.LogFactory;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
/**
- * Command that implements cluster replication logic. Essentially mimics the replicate() and replicateAll() methods in
- * 2.1.x, we may need to revisit the usefulness of such a command.
+ * Command that implements cluster replication logic.
* <p/>
* This is not a {@link VisitableCommand} and hence not passed up the {@link org.horizon.interceptors.base.CommandInterceptor}
* chain.
@@ -42,52 +44,38 @@
* @author Mircea.Markus at jboss.com
* @since 1.0
*/
-public class ReplicateCommand implements ReplicableCommand {
- public static final byte SINGLE_METHOD_ID = 13;
- public static final byte MULTIPLE_METHOD_ID = 14;
+public class ReplicateCommand implements RPCCommand {
+ public static final byte METHOD_ID = 13;
- private InterceptorChain invoker;
+ private InterceptorChain interceptorChain;
private static final Log log = LogFactory.getLog(ReplicateCommand.class);
private static final boolean trace = log.isTraceEnabled();
- /**
- * optimisation - rather than constructing a new list each for scenarios where a single modification needs to be
- * replicated rather use this instance.
- */
- private ReplicableCommand singleModification;
- private List<ReplicableCommand> modifications;
+ private List<ReplicableCommand> commands;
+ private String cacheName;
- public ReplicateCommand(List<ReplicableCommand> modifications) {
+ public ReplicateCommand(List<ReplicableCommand> modifications, String cacheName) {
if (modifications != null && modifications.size() == 1) {
- singleModification = modifications.get(0);
+ this.commands = Collections.singletonList(modifications.get(0));
} else {
- this.modifications = modifications;
+ this.commands = modifications;
}
+ this.cacheName = cacheName;
}
- public ReplicateCommand(ReplicableCommand command) {
- this.singleModification = command;
+ public ReplicateCommand(ReplicableCommand command, String cacheName) {
+ commands = Collections.singletonList(command);
+ this.cacheName = cacheName;
}
public ReplicateCommand() {
}
- public void initialize(InterceptorChain interceptorChain) {
- this.invoker = interceptorChain;
+ public void setInterceptorChain(InterceptorChain interceptorChain) {
+ this.interceptorChain = interceptorChain;
}
- public void setSingleModification(ReplicableCommand singleModification) {
- this.singleModification = singleModification;
- }
-
- public void setModifications(List<ReplicableCommand> modifications) {
- if (modifications != null && modifications.size() == 1)
- singleModification = modifications.get(0);
- else
- this.modifications = modifications;
- }
-
/**
* Executes commands replicated to the current cache instance by other cache instances.
*
@@ -97,21 +85,21 @@
*/
public Object perform(InvocationContext ctx) throws Throwable {
if (isSingleCommand()) {
- return processSingleCommand(singleModification);
+ return processCommand(ctx, commands.get(0));
} else {
- for (ReplicableCommand command : modifications) processSingleCommand(command);
+ for (ReplicableCommand command : commands) processCommand(ctx, command);
return null;
}
}
- private Object processSingleCommand(ReplicableCommand cacheCommand)
+ private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand)
throws Throwable {
Object result;
try {
if (trace) log.trace("Invoking command " + cacheCommand + ", with originLocal flag set to false.");
-
+ ctx.setOriginLocal(false);
if (cacheCommand instanceof VisitableCommand) {
- Object retVal = invoker.invokeRemote((VisitableCommand) cacheCommand);
+ Object retVal = interceptorChain.invokeRemote((VisitableCommand) cacheCommand);
// we only need to return values for a set of remote calls; not every call.
if (returnValueForRemoteCall(cacheCommand)) {
result = retVal;
@@ -119,7 +107,8 @@
result = null;
}
} else {
- result = cacheCommand.perform(null);
+ throw new RuntimeException("Do we still need to deal with non-visitable commands?");
+// result = cacheCommand.perform(null);
}
}
catch (Throwable ex) {
@@ -143,35 +132,57 @@
}
public byte getCommandId() {
- return isSingleCommand() ? SINGLE_METHOD_ID : MULTIPLE_METHOD_ID;
+ return METHOD_ID;
}
- public List<ReplicableCommand> getModifications() {
- return modifications;
+ public List<ReplicableCommand> getCommands() {
+ return commands;
}
- public ReplicableCommand getSingleModification() {
- return singleModification;
+ public void addCommand(ReplicableCommand command) {
+ if (commands == null) {
+ commands = Collections.singletonList(command);
+ } else {
+ upgradeCommandsListIfNeeded();
+ commands.add(command);
+ }
}
+ public void addCommands(Collection<? extends ReplicableCommand> commands) {
+ upgradeCommandsListIfNeeded();
+ this.commands.addAll(commands);
+ }
+
+ private void upgradeCommandsListIfNeeded() {
+ if (!(commands instanceof ArrayList)) {
+ commands = new ArrayList<ReplicableCommand>(commands);
+ }
+ }
+
+ public String getCacheName() {
+ return cacheName;
+ }
+
+ public void setCacheName(String name) {
+ this.cacheName = cacheName;
+ }
+
+ public ReplicableCommand getSingleCommand() {
+ return commands.get(0);
+ }
+
public Object[] getParameters() {
- if (isSingleCommand())
- return new Object[]{singleModification};
- else
- return new Object[]{modifications};
+ return new Object[]{cacheName, commands};
}
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args) {
- if (commandId == SINGLE_METHOD_ID) {
- singleModification = (ReplicableCommand) args[0];
- } else {
- modifications = (List<ReplicableCommand>) args[0];
- }
+ cacheName = (String) args[0];
+ commands = (List<ReplicableCommand>) args[1];
}
public boolean isSingleCommand() {
- return singleModification != null;
+ return commands != null && commands.size() == 1;
}
@Override
@@ -181,28 +192,14 @@
ReplicateCommand that = (ReplicateCommand) o;
- if (modifications != null ? !modifications.equals(that.modifications) : that.modifications != null) return false;
- if (singleModification != null ? !singleModification.equals(that.singleModification) : that.singleModification != null)
- return false;
-
- return true;
+ return !(commands != null ? !commands.equals(that.commands) : that.commands != null);
}
@Override
public int hashCode() {
- int result;
- result = (singleModification != null ? singleModification.hashCode() : 0);
- result = 31 * result + (modifications != null ? modifications.hashCode() : 0);
- return result;
+ return commands != null ? commands.hashCode() : 0;
}
- @Override
- public String toString() {
- return "ReplicateCommand{" +
- "cmds=" + (isSingleCommand() ? singleModification : modifications) +
- '}';
- }
-
/**
* Creates a copy of this command, amking a deep copy of any collections but everything else copied shallow.
*
@@ -211,20 +208,32 @@
public ReplicateCommand copy() {
ReplicateCommand clone;
clone = new ReplicateCommand();
- clone.invoker = invoker;
- clone.modifications = modifications == null ? null : new ArrayList<ReplicableCommand>(modifications);
- clone.singleModification = singleModification;
+ clone.interceptorChain = interceptorChain;
+ if (commands != null) {
+ if (commands.size() == 1)
+ clone.commands = Collections.singletonList(commands.get(0));
+ else
+ clone.commands = new ArrayList<ReplicableCommand>(commands);
+ }
return clone;
}
public boolean containsCommandType(Class<? extends ReplicableCommand> aClass) {
- if (isSingleCommand()) {
- return getSingleModification().getClass().equals(aClass);
+ if (commands.size() == 1) {
+ return commands.get(0).getClass().equals(aClass);
} else {
- for (ReplicableCommand command : getModifications()) {
+ for (ReplicableCommand command : getCommands()) {
if (command.getClass().equals(aClass)) return true;
}
}
return false;
}
+
+ @Override
+ public String toString() {
+ return "ReplicateCommand{" +
+ "commands=" + commands +
+ ", cacheName='" + cacheName + '\'' +
+ '}';
+ }
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -71,7 +71,9 @@
public Object perform(InvocationContext ctx) throws Throwable {
Object o = null;
MVCCEntry e = ctx.lookupEntry(key);
- if (e.getValue() == null || !putIfAbsent) {
+ if (e.getValue() != null && putIfAbsent) {
+ return e.getValue();
+ } else {
notifier.notifyCacheEntryModified(key, true, ctx);
if (value instanceof Delta) {
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -57,13 +57,15 @@
public Object perform(InvocationContext ctx) throws Throwable {
MVCCEntry e = ctx.lookupEntry(key);
- if (e == null || e.isNullEntry()) return null;
+ if (e == null || e.isNullEntry()) return value == null ? null : false;
+ if (value != null && e.getValue() != null && !e.getValue().equals(value))
+ return false;
+
notifier.notifyCacheEntryRemoved(key, true, ctx);
e.setDeleted(true);
e.setValid(false);
notifier.notifyCacheEntryRemoved(key, false, ctx);
- return e.getValue();
-
+ return value == null ? e.getValue() : true;
}
public byte getCommandId() {
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -50,14 +50,14 @@
return visitor.visitReplaceCommand(ctx, this);
}
- public Boolean perform(InvocationContext ctx) throws Throwable {
+ public Object perform(InvocationContext ctx) throws Throwable {
MVCCEntry e = ctx.lookupEntry(key);
- if (e == null || e.isNullEntry()) return false;
+ if (e == null || e.isNullEntry()) return oldValue == null ? null : false;
if (oldValue == null || oldValue.equals(e.getValue())) {
- e.setValue(newValue);
- return true;
+ Object old = e.setValue(newValue);
+ return oldValue == null ? old : true;
}
- return false;
+ return oldValue == null ? null : false;
}
public byte getCommandId() {
Modified: core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -59,7 +59,9 @@
}
public Object setValue(Object value) {
- return this.value = value;
+ Object oldValue = this.value;
+ this.value = value;
+ return oldValue;
}
protected static enum Flags {
Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,33 +21,47 @@
*/
package org.horizon.container;
+import java.util.AbstractSet;
+import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
- * // TODO: crappy and inefficient - but just a placeholder for now.
+ * The basic container. Accepts null keys.
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @since 1.0
*/
public class UnsortedDataContainer<K, V> implements DataContainer<K, V> {
- private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
+ private final ConcurrentMap<Object, Object> data = new ConcurrentHashMap<Object, Object>();
+ private static final Object NULL = new Object();
+ @SuppressWarnings("unchecked")
+ private Object maskNull(Object o) {
+ return o == null ? (K) NULL : (K) o;
+ }
+
+ private Object unmaskNull(Object o) {
+ return (o == NULL) ? null : o;
+ }
+
+ @SuppressWarnings("unchecked")
public V get(K k) {
- return data.get(k);
+ return (V) unmaskNull(data.get(maskNull(k)));
}
public void put(K k, V v) {
- data.put(k, v);
+ data.put(maskNull(k), maskNull(v));
}
public boolean containsKey(K k) {
- return data.containsKey(k);
+ return data.containsKey(maskNull(k));
}
+ @SuppressWarnings("unchecked")
public V remove(K k) {
- return data.remove(k);
+ return (V) unmaskNull(data.remove(maskNull(k)));
}
public int size() {
@@ -59,7 +73,7 @@
}
public Set<K> keySet() {
- return data.keySet();
+ return new KeySet();
}
public boolean evict(Object key) {
@@ -69,4 +83,53 @@
public String toString() {
return data.toString();
}
+
+ private class KeySet extends AbstractSet<K> {
+ Set<Object> realSet;
+
+ public KeySet() {
+ this.realSet = data.keySet();
+ }
+
+ public Iterator<K> iterator() {
+ return new KeyIterator(realSet.iterator());
+ }
+
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean contains(Object o) {
+ return realSet.contains(maskNull(o));
+ }
+
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int size() {
+ return realSet.size();
+ }
+ }
+
+ private class KeyIterator implements Iterator<K> {
+ Iterator<Object> realIterator;
+
+ private KeyIterator(Iterator<Object> realIterator) {
+ this.realIterator = realIterator;
+ }
+
+ public boolean hasNext() {
+ return realIterator.hasNext();
+ }
+
+ @SuppressWarnings("unchecked")
+ public K next() {
+ return (K) unmaskNull(realIterator.next());
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -13,7 +13,7 @@
import java.util.Map;
/**
- * // TODO: Manik: Document this!
+ * Named cache specific components
*
* @author Manik Surtani
* @since 1.0
@@ -43,8 +43,6 @@
registerComponent(this, ComponentRegistry.class);
registerComponent(configuration, Configuration.class);
registerComponent(new BootstrapFactory(cache, configuration, this), BootstrapFactory.class);
-
- globalComponents.registerNamedComponentRegistry(this, cacheName);
}
catch (Exception e) {
throw new CacheException("Unable to construct a ComponentRegistry!", e);
@@ -103,5 +101,12 @@
globalComponents.start();
}
super.start();
+ globalComponents.registerNamedComponentRegistry(this, cacheName);
}
+
+ @Override
+ public void stop() {
+ if (state.stopAllowed()) globalComponents.unregisterNamedComponentRegistry(cacheName);
+ super.stop();
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,19 +1,23 @@
package org.horizon.factories;
+import org.horizon.commands.RemoteCommandFactory;
import org.horizon.config.ConfigurationException;
import org.horizon.factories.annotations.DefaultFactoryFor;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
import org.horizon.marshall.Marshaller;
import org.horizon.marshall.VersionAwareMarshaller;
import org.horizon.notifications.CacheManagerNotifier;
import org.horizon.remoting.InboundInvocationHandler;
/**
- * // TODO: Manik: Document this!
+ * Factory for building global-scope components which have default empty constructors
*
* @author Manik Surtani
* @since 1.0
*/
- at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, Marshaller.class})
+ at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, Marshaller.class, RemoteCommandFactory.class})
+ at Scope(Scopes.GLOBAL)
public class EmptyConstructorFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
public <T> T construct(Class<T> componentType) {
try {
Modified: core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -15,7 +15,7 @@
import java.util.ArrayList;
/**
- * // TODO: Manik: Document this!
+ * A global component registry where shared components are stored.
*
* @author Manik Surtani
* @since 1.0
@@ -25,6 +25,7 @@
public class GlobalComponentRegistry extends AbstractComponentRegistry {
private Log log = LogFactory.getLog(GlobalComponentRegistry.class);
+ private static final String NAMED_REGISTRY_PREFIX = "NamedComponentRegistry:";
/**
* Hook to shut down the cache when the JVM exits.
*/
@@ -96,10 +97,14 @@
}
public ComponentRegistry getNamedComponentRegistry(String name) {
- return getComponent(ComponentRegistry.class, name);
+ return getComponent(ComponentRegistry.class, NAMED_REGISTRY_PREFIX + name);
}
public void registerNamedComponentRegistry(ComponentRegistry componentRegistry, String name) {
- registerComponent(componentRegistry, name);
+ registerComponent(componentRegistry, NAMED_REGISTRY_PREFIX + name);
}
+
+ public void unregisterNamedComponentRegistry(String name) {
+ componentLookup.remove(NAMED_REGISTRY_PREFIX + name);
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -134,7 +134,7 @@
if (tx == null || !TransactionTable.isValid(tx)) {
// the no-tx case:
//replicate an evict call.
- invalidateAcrossCluster(key, null, isSynchronous(optionOverride), ctx);
+ invalidateAcrossCluster(key, isSynchronous(optionOverride), ctx);
} else {
if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification(command);
}
@@ -152,7 +152,7 @@
log.debug("Modification list contains a putForExternalRead operation. Not invalidating.");
} else {
try {
- for (Object key : filterVisitor.result) invalidateAcrossCluster(key, null, defaultSynchronous, ctx);
+ for (Object key : filterVisitor.result) invalidateAcrossCluster(key, defaultSynchronous, ctx);
}
catch (Throwable t) {
log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t);
@@ -193,7 +193,7 @@
}
- protected void invalidateAcrossCluster(Object fqn, Object workspace, boolean synchronous, InvocationContext ctx) throws Throwable {
+ protected void invalidateAcrossCluster(Object fqn, boolean synchronous, InvocationContext ctx) throws Throwable {
if (!isLocalModeForced(ctx)) {
// increment invalidations counter if statistics maintained
incrementInvalidations();
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -23,6 +23,7 @@
import org.horizon.cluster.ReplicationQueue;
import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.config.Option;
import org.horizon.context.InvocationContext;
@@ -82,15 +83,23 @@
}
}
- protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+ protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
replicateCall(ctx, null, call, sync, o, useOutOfBandMessage);
}
- protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable {
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+ replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, o, useOutOfBandMessage);
+ }
+
+ protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync, Option o) throws Throwable {
replicateCall(ctx, null, call, sync, o, false);
}
- protected void replicateCall(InvocationContext ctx, List<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable {
+ replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, o, false);
+ }
+
+ protected void replicateCall(InvocationContext ctx, List<Address> recipients, RPCCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
long syncReplTimeout = configuration.getSyncReplTimeout();
// test for option overrides
@@ -111,15 +120,15 @@
}
}
- replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
+ replicateCall(recipients, c, sync, useOutOfBandMessage, syncReplTimeout);
}
- protected void replicateCall(List<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable {
+ protected void replicateCall(List<Address> recipients, RPCCommand call, boolean sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
if (!sync && replicationQueue != null) {
if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
- replicationQueue.add(commandsFactory.buildReplicateCommand(call));
+ replicationQueue.add(call);
} else {
List<Address> callRecipients = recipients;
if (callRecipients == null) {
@@ -128,10 +137,8 @@
log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
}
- ReplicableCommand toCall = wrapCacheCommandInReplicateMethod ? commandsFactory.buildReplicateCommand(call) : call;
-
List rsps = rpcManager.invokeRemotely(callRecipients,
- toCall,
+ call,
sync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS, // is synchronised?
timeout,
useOutOfBandMessage
Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -41,7 +41,6 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -265,7 +264,7 @@
throw new NullPointerException("Null arguments not allowed");
if (cacheName.equals(DEFAULT_CACHE_NAME))
throw new IllegalArgumentException("Cache name cannot be used as it is a reserved, internal name");
- if (configurationOverrides.putIfAbsent(cacheName, configurationOverride) != null)
+ if (configurationOverrides.putIfAbsent(cacheName, configurationOverride.clone()) != null)
throw new DuplicateCacheNameException("Cache name [" + cacheName + "] already in use!");
}
@@ -307,11 +306,9 @@
return globalConfiguration.getClusterName();
}
- @SuppressWarnings("unchecked")
public List<Address> getMembers() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- List l = rpcManager == null ? Collections.emptyList() : rpcManager.getMembers();
- return l;
+ return rpcManager == null ? null : rpcManager.getMembers();
}
public Address getAddress() {
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -38,7 +38,6 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import java.io.ByteArrayInputStream;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -55,38 +54,32 @@
*/
public class HorizonMarshaller implements Marshaller {
// magic numbers
- protected static final int MAGICNUMBER_METHODCALL = 1;
- protected static final int MAGICNUMBER_FQN = 2;
- protected static final int MAGICNUMBER_GTX = 3;
- protected static final int MAGICNUMBER_JG_ADDRESS = 4;
- protected static final int MAGICNUMBER_ARRAY_LIST = 5;
- protected static final int MAGICNUMBER_INTEGER = 6;
- protected static final int MAGICNUMBER_LONG = 7;
- protected static final int MAGICNUMBER_BOOLEAN = 8;
- protected static final int MAGICNUMBER_STRING = 9;
- protected static final int MAGICNUMBER_DEFAULT_DATA_VERSION = 10;
- protected static final int MAGICNUMBER_LINKED_LIST = 11;
- protected static final int MAGICNUMBER_HASH_MAP = 12;
- protected static final int MAGICNUMBER_TREE_MAP = 13;
- protected static final int MAGICNUMBER_HASH_SET = 14;
- protected static final int MAGICNUMBER_TREE_SET = 15;
- protected static final int MAGICNUMBER_NODEDATA_MARKER = 16;
- protected static final int MAGICNUMBER_NODEDATA_EXCEPTION_MARKER = 17;
- protected static final int MAGICNUMBER_NODEDATA = 18;
- protected static final int MAGICNUMBER_GRAVITATERESULT = 19;
- protected static final int MAGICNUMBER_SHORT = 20;
- protected static final int MAGICNUMBER_IMMUTABLE_MAPCOPY = 21;
- protected static final int MAGICNUMBER_MARSHALLEDVALUE = 22;
- protected static final int MAGICNUMBER_FASTCOPY_HASHMAP = 23;
- protected static final int MAGICNUMBER_ARRAY = 24;
- protected static final int MAGICNUMBER_BYTE = 25;
- protected static final int MAGICNUMBER_CHAR = 26;
- protected static final int MAGICNUMBER_FLOAT = 27;
- protected static final int MAGICNUMBER_DOUBLE = 28;
- protected static final int MAGICNUMBER_OBJECT = 29;
+ protected static final int MAGICNUMBER_GTX = 1;
+ protected static final int MAGICNUMBER_JG_ADDRESS = 2;
+ protected static final int MAGICNUMBER_ARRAY_LIST = 3;
+ protected static final int MAGICNUMBER_INTEGER = 4;
+ protected static final int MAGICNUMBER_LONG = 5;
+ protected static final int MAGICNUMBER_BOOLEAN = 6;
+ protected static final int MAGICNUMBER_STRING = 7;
+ protected static final int MAGICNUMBER_LINKED_LIST = 8;
+ protected static final int MAGICNUMBER_HASH_MAP = 9;
+ protected static final int MAGICNUMBER_TREE_MAP = 10;
+ protected static final int MAGICNUMBER_HASH_SET = 11;
+ protected static final int MAGICNUMBER_TREE_SET = 12;
+ protected static final int MAGICNUMBER_SHORT = 13;
+ protected static final int MAGICNUMBER_IMMUTABLE_MAPCOPY = 14;
+ protected static final int MAGICNUMBER_MARSHALLEDVALUE = 15;
+ protected static final int MAGICNUMBER_FASTCOPY_HASHMAP = 16;
+ protected static final int MAGICNUMBER_ARRAY = 17;
+ protected static final int MAGICNUMBER_BYTE = 18;
+ protected static final int MAGICNUMBER_CHAR = 19;
+ protected static final int MAGICNUMBER_FLOAT = 20;
+ protected static final int MAGICNUMBER_DOUBLE = 21;
+ protected static final int MAGICNUMBER_OBJECT = 22;
+ protected static final int MAGICNUMBER_SINGLETON_LIST = 23;
+ protected static final int MAGICNUMBER_COMMAND = 24;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
-
protected static final int MAGICNUMBER_REF = 101;
public HorizonMarshaller() {
@@ -101,8 +94,9 @@
protected ClassLoader defaultClassLoader;
protected boolean useRefs = false;
- public void init(ClassLoader defaultClassLoader) {
+ public void init(ClassLoader defaultClassLoader, RemoteCommandFactory remoteCommandFactory) {
this.defaultClassLoader = defaultClassLoader;
+ this.remoteCommandFactory = remoteCommandFactory;
}
protected void initLogger() {
@@ -132,10 +126,10 @@
ReplicableCommand command = (ReplicableCommand) o;
if (command.getCommandId() > -1) {
- out.writeByte(MAGICNUMBER_METHODCALL);
+ out.writeByte(MAGICNUMBER_COMMAND);
marshallCommand(command, out, refMap);
} else {
- throw new IllegalArgumentException("MethodCall does not have a valid method id. Was this method call created with MethodCallFactory?");
+ throw new IllegalArgumentException("Command does not have a valid method id!");
}
} else if (o instanceof MarshalledValue) {
out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
@@ -158,6 +152,9 @@
} else if (o instanceof LinkedList) {
out.writeByte(MAGICNUMBER_LINKED_LIST);
marshallCollection((Collection) o, out, refMap);
+ } else if (o.getClass().getName().equals("java.util.Collections$SingletonList")) {
+ out.writeByte(MAGICNUMBER_SINGLETON_LIST);
+ marshallObject(((List) o).get(0), out, refMap);
} else if (o.getClass().equals(HashMap.class)) {
out.writeByte(MAGICNUMBER_HASH_MAP);
marshallMap((Map) o, out, refMap);
@@ -192,15 +189,6 @@
out.writeByte(MAGICNUMBER_STRING);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallString((String) o, out);
- } else if (o instanceof NodeDataMarker) {
- out.writeByte(MAGICNUMBER_NODEDATA_MARKER);
- ((Externalizable) o).writeExternal(out);
- } else if (o instanceof NodeDataExceptionMarker) {
- out.writeByte(MAGICNUMBER_NODEDATA_EXCEPTION_MARKER);
- ((Externalizable) o).writeExternal(out);
- } else if (o instanceof NodeData) {
- out.writeByte(MAGICNUMBER_NODEDATA);
- ((Externalizable) o).writeExternal(out);
} else if (o instanceof Serializable) {
if (trace) {
log.trace("Warning: using object serialization for " + o.getClass());
@@ -306,7 +294,7 @@
MarshalledValue mv = new MarshalledValue();
mv.readExternal(in);
return mv;
- case MAGICNUMBER_METHODCALL:
+ case MAGICNUMBER_COMMAND:
retVal = unmarshallCommand(in, refMap);
return retVal;
case MAGICNUMBER_GTX:
@@ -323,6 +311,8 @@
return unmarshallArrayList(in, refMap);
case MAGICNUMBER_LINKED_LIST:
return unmarshallLinkedList(in, refMap);
+ case MAGICNUMBER_SINGLETON_LIST:
+ return unmarshallSingletonList(in, refMap);
case MAGICNUMBER_HASH_MAP:
return unmarshallHashMap(in, refMap);
case MAGICNUMBER_TREE_MAP:
@@ -348,18 +338,6 @@
retVal = unmarshallString(in);
if (useRefs) refMap.putReferencedObject(reference, retVal);
return retVal;
- case MAGICNUMBER_NODEDATA_MARKER:
- retVal = new NodeDataMarker();
- ((NodeDataMarker) retVal).readExternal(in);
- return retVal;
- case MAGICNUMBER_NODEDATA_EXCEPTION_MARKER:
- retVal = new NodeDataExceptionMarker();
- ((NodeDataExceptionMarker) retVal).readExternal(in);
- return retVal;
- case MAGICNUMBER_NODEDATA:
- retVal = new NodeData();
- ((NodeData) retVal).readExternal(in);
- return retVal;
default:
if (log.isErrorEnabled()) {
log.error("Unknown Magic Number " + magicNumber);
@@ -421,6 +399,10 @@
return list;
}
+ private List unmarshallSingletonList(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception {
+ return Collections.singletonList(unmarshallObject(in, refMap));
+ }
+
private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception {
Map map = new HashMap();
populateFromStream(in, refMap, map);
Modified: core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
*/
package org.horizon.marshall;
+import org.horizon.commands.RemoteCommandFactory;
import org.horizon.factories.annotations.Inject;
import org.horizon.io.ByteBuffer;
import org.horizon.io.ExposedByteArrayOutputStream;
@@ -54,9 +55,9 @@
ClassLoader defaultClassLoader;
@Inject
- public void init(ClassLoader loader) {
+ public void init(ClassLoader loader, RemoteCommandFactory remoteCommandFactory) {
defaultMarshaller = new HorizonMarshaller();
- defaultMarshaller.init(loader);
+ defaultMarshaller.init(loader, remoteCommandFactory);
}
protected int getCustomMarshallerVersionInt() {
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,6 +1,6 @@
package org.horizon.remoting;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
import org.horizon.factories.scopes.Scope;
import org.horizon.factories.scopes.Scopes;
@@ -20,5 +20,5 @@
* @param command command to invoke
* @return results, if any, from the invocation
*/
- Object handle(ReplicableCommand command);
+ Object handle(RPCCommand command) throws Throwable;
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,48 +1,49 @@
package org.horizon.remoting;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
import org.horizon.factories.ComponentRegistry;
import org.horizon.factories.GlobalComponentRegistry;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
import org.horizon.interceptors.InterceptorChain;
import org.horizon.invocation.InvocationContextContainer;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
/**
- * // TODO: Manik: Document this!
+ * Sets the cache interceptor chain on an RPCCommand before calling it to perform
*
* @author Manik Surtani
* @since 1.0
*/
+ at NonVolatile
+ at Scope(Scopes.GLOBAL)
public class InboundInvocationHandlerImpl implements InboundInvocationHandler {
- InvocationContextContainer invocationContextContainer;
- ComponentRegistry componentRegistry;
- InterceptorChain interceptorChain;
GlobalComponentRegistry gcr;
+ private static final Log log = LogFactory.getLog(InboundInvocationHandlerImpl.class);
+ @Inject
public void inject(GlobalComponentRegistry gcr) {
this.gcr = gcr;
}
- private ComponentRegistry getNamedCacheComponentRegistry(String name) {
- return gcr.getNamedComponentRegistry(name);
- }
+ public Object handle(RPCCommand cmd) throws Throwable {
+ String cacheName = cmd.getCacheName();
+ ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
+ if (cr == null) {
+ log.info("Cache named {0} does not exist on this cache manager!", cacheName);
+ return null;
+ }
+ InterceptorChain ic = cr.getComponent(InterceptorChain.class);
+ InvocationContextContainer icc = cr.getComponent(InvocationContextContainer.class);
+ CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
- public Object handle(ReplicableCommand command) {
-
- throw new RuntimeException("Implement me!");
-// if (cmd instanceof VisitableCommand) {
-// InvocationContext ctx = invocationContextContainer.get();
-// ctx.setOriginLocal(false);
-// if (!componentRegistry.invocationsAllowed(false)) {
-// return null;
-// }
-// return interceptorChain.invoke(ctx, (VisitableCommand) command);
-// } else {
-// if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
-//
-// // need to check cache status for all except buddy replication commands.
-// if (!componentRegistry.invocationsAllowed(false)) return null;
-//
-// return cmd.perform(null);
-// }
+ cmd.setInterceptorChain(ic);
+ // initialize this command with components specific to the intended cache instance
+ commandsFactory.initializeReplicableCommand(cmd);
+ return cmd.perform(icc.get());
}
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,7 +21,7 @@
*/
package org.horizon.remoting;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
import org.horizon.factories.annotations.NonVolatile;
import org.horizon.factories.scopes.Scope;
import org.horizon.factories.scopes.Scopes;
@@ -49,7 +49,7 @@
*
* @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
* entire cluster.
- * @param cacheCommand the cache command to invoke
+ * @param rpcCommand the cache command to invoke
* @param mode the response mode to use
* @param timeout a timeout after which to throw a replication exception.
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
@@ -58,14 +58,14 @@
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
* @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
* entire cluster.
- * @param cacheCommand the cache command to invoke
+ * @param rpcCommand the cache command to invoke
* @param mode the response mode to use
* @param timeout a timeout after which to throw a replication exception.
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
@@ -73,20 +73,20 @@
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
- * entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire
+ * cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -3,7 +3,7 @@
import org.horizon.annotations.MBean;
import org.horizon.annotations.ManagedAttribute;
import org.horizon.annotations.ManagedOperation;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
import org.horizon.config.GlobalConfiguration;
import org.horizon.factories.KnownComponentNames;
import org.horizon.factories.annotations.ComponentName;
@@ -54,16 +54,16 @@
t.stop();
}
- public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
- return t.invokeRemotely(recipients, cacheCommand, mode, timeout, usePriorityQueue, responseFilter);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
}
- public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
- return t.invokeRemotely(recipients, cacheCommand, mode, timeout, usePriorityQueue, null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
}
- public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout) throws Exception {
- return t.invokeRemotely(recipients, cacheCommand, mode, timeout, false, null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
}
public boolean isCoordinator() {
@@ -119,4 +119,9 @@
double ration = (double) replicationCount.get() / totalCount * 100d;
return NumberFormat.getInstance().format(ration) + "%";
}
+
+ // mainly for unit testing
+ public void setTransport(Transport transport) {
+ this.t = transport;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,6 +1,6 @@
package org.horizon.remoting.transport;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
import org.horizon.config.GlobalConfiguration;
import org.horizon.factories.annotations.NonVolatile;
import org.horizon.factories.scopes.Scope;
@@ -44,7 +44,7 @@
*
* @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
* entire cluster.
- * @param cacheCommand the cache command to invoke
+ * @param rpcCommand the cache command to invoke
* @param mode the response mode to use
* @param timeout a timeout after which to throw a replication exception.
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
@@ -53,7 +53,7 @@
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
Added: core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -0,0 +1,68 @@
+package org.horizon;
+
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class BaseReplicatedTest {
+ ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new ThreadLocal<List<CacheManager>>() {
+ @Override
+ protected List<CacheManager> initialValue() {
+ return new LinkedList<CacheManager>();
+ }
+ };
+
+ /**
+ * @return a list of registered cache managers on the current thread.
+ */
+ protected List<CacheManager> getCacheManagers() {
+ return cacheManagerThreadLocal.get();
+ }
+
+ /**
+ * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+ * Uses a default clustered cache manager global config.
+ *
+ * @return the new CacheManager
+ */
+ protected CacheManager addCacheManager() {
+ return addCacheManager(GlobalConfiguration.getClusteredDefault());
+ }
+
+ /**
+ * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+ *
+ * @param globalConfig config to use
+ * @return the new CacheManager
+ */
+ protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
+ CacheManager cm = new DefaultCacheManager(globalConfig);
+ cacheManagerThreadLocal.get().add(cm);
+ return cm;
+ }
+
+ protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
+ for (CacheManager cm : cacheManagerThreadLocal.get()) {
+ cm.defineCache(cacheName, c);
+ }
+ }
+
+ protected void assertClusterSize(String message, int size) {
+ for (CacheManager cm : cacheManagerThreadLocal.get()) {
+ assert cm.getMembers() != null && cm.getMembers().size() == size : message;
+ }
+ }
+
+ @AfterMethod
+ public void cleanupThreadLocals() {
+ TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new CacheManager[cacheManagerThreadLocal.get().size()]));
+ cacheManagerThreadLocal.get().clear();
+ }
+
+}
Modified: core/branches/flat/src/test/java/org/horizon/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BasicTest.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/BasicTest.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -22,8 +22,10 @@
package org.horizon;
import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
import org.horizon.manager.NamedCacheNotFoundException;
import org.horizon.util.TestingUtil;
@@ -31,6 +33,8 @@
@Test(groups = "functional")
public class BasicTest {
+ public static final Log log = LogFactory.getLog(BasicTest.class);
+
public void basicTest() throws Exception {
// create a cache manager
Configuration c = new Configuration(); // LOCAL mode
@@ -61,25 +65,18 @@
}
}
- public static final Log log = LogFactory.getLog(BasicTest.class);
-
public void testBasicReplication() throws NamedCacheNotFoundException {
Configuration configuration = new Configuration();
configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- DefaultCacheManager firstManager = new DefaultCacheManager(configuration);
- DefaultCacheManager secondManager = new DefaultCacheManager(configuration);
+ CacheManager firstManager = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(), configuration);
+ CacheManager secondManager = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(), configuration);
try {
- firstManager.start();
- secondManager.start();
+ CacheSPI firstCache = (CacheSPI) firstManager.getCache();
+ CacheSPI secondCache = (CacheSPI) secondManager.getCache();
+ TestingUtil.blockUntilViewsReceived(60000, firstManager, secondManager);
- CacheSPI firstCache = (CacheSPI) firstManager.getCache("test");
- CacheSPI secondCache = (CacheSPI) secondManager.getCache("test");
-
- TestingUtil.blockUntilViewReceived(secondCache, 2, 3000);
-
-
firstCache.put("key", "value");
assert secondCache.get("key").equals("value");
@@ -90,16 +87,38 @@
assert secondCache.get("key") == null;
}
finally {
- firstManager.stop();
- secondManager.stop();
+ TestingUtil.killCacheManagers(firstManager, secondManager);
}
}
public void concurrentMapMethodTest() {
+ CacheManager cm = null;
+ Cache<String, String> c = null;
+ try {
+ cm = new DefaultCacheManager();
+ c = cm.getCache();
- }
+ assert c.putIfAbsent("A", "B") == null;
+ assert c.putIfAbsent("A", "C").equals("B");
+ assert c.get("A").equals("B");
- public void transactionalTest() {
+ assert !c.remove("A", "C");
+ assert c.containsKey("A");
+ assert c.remove("A", "B");
+ assert !c.containsKey("A");
+ c.put("A", "B");
+
+ assert !c.replace("A", "D", "C");
+ assert c.get("A").equals("B");
+ assert c.replace("A", "B", "C");
+ assert c.get("A").equals("C");
+
+ assert c.replace("A", "X").equals("C");
+ assert c.replace("X", "A") == null;
+ assert !c.containsKey("X");
+ } finally {
+ TestingUtil.killCacheManagers(cm);
+ }
}
}
Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -5,7 +5,7 @@
import org.horizon.Cache;
import org.horizon.CacheSPI;
import org.horizon.UnitTestCacheFactory;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.config.Configuration;
@@ -118,7 +118,7 @@
// specify what we expectWithTx called on the mock Rpc Manager. For params we don't care about, just use ANYTHING.
// setting the mock object to expectWithTx the "sync" param to be false.
- expect(rpcManager.invokeRemotely(anyAddresses(), (ReplicableCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
+ expect(rpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
replay(rpcManager);
@@ -162,7 +162,7 @@
List<Address> memberList = originalRpcManager.getMembers();
expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
expect(barfingRpcManager.getAddress()).andReturn(originalRpcManager.getAddress()).anyTimes();
- expect(barfingRpcManager.invokeRemotely(anyAddresses(), (ReplicableCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
+ expect(barfingRpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
replay(barfingRpcManager);
TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);
Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -6,79 +6,172 @@
*/
package org.horizon.replication;
+import static org.easymock.EasyMock.*;
+import org.horizon.BaseReplicatedTest;
import org.horizon.Cache;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.commands.RPCCommand;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
+import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.RPCManagerImpl;
+import org.horizon.remoting.ResponseFilter;
+import org.horizon.remoting.ResponseMode;
+import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
*/
- at Test(groups = {"functional", "jgroups"})
-public class SyncReplTest {
- private ThreadLocal<Cache<Object, Object>[]> cachesTL = new ThreadLocal<Cache<Object, Object>[]>();
+ at Test(groups = "functional", sequential = true)
+public class SyncReplTest extends BaseReplicatedTest {
+ Cache cache1, cache2;
+ String k = "key", v = "value";
@BeforeMethod(alwaysRun = true)
public void setUp() {
- System.out.println("*** In setUp()");
- Cache<Object, Object>[] caches = new Cache[2];
- Configuration configuration = new Configuration();
- configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- caches[0] = new UnitTestCacheManager(configuration).createCache("test");
- caches[1] = new UnitTestCacheManager(configuration).createCache("test");
- cachesTL.set(caches);
- TestingUtil.blockUntilViewsReceived(caches, 5000);
- System.out.println("*** Finished setUp()");
- }
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- Cache<Object, Object>[] caches = cachesTL.get();
- if (caches != null) TestingUtil.killCaches(caches);
- cachesTL.set(null);
+ Configuration replSync = new Configuration();
+ replSync.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+
+ cm1.defineCache("replSync", replSync);
+ cm2.defineCache("replSync", replSync);
+
+ cache1 = cm1.getCache("replSync");
+ cache2 = cm2.getCache("replSync");
+
+ TestingUtil.blockUntilViewsReceived(60000, true, cm1, cm2);
}
public void testBasicOperation() {
- Cache<Object, Object>[] caches = cachesTL.get();
assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
- String k = "key", v = "value";
+ assertNull("Should be null", cache1.get(k));
+ assertNull("Should be null", cache2.get(k));
- assertNull("Should be null", caches[0].get(k));
- assertNull("Should be null", caches[1].get(k));
+ cache1.put(k, v);
- caches[0].put(k, v);
+ assertEquals(v, cache1.get(k));
+ assertEquals("Should have replicated", v, cache2.get(k));
- assertEquals(v, caches[0].get(k));
- assertEquals("Should have replicated", v, caches[1].get(k));
+ cache2.remove(k);
+ assert cache1.isEmpty();
+ assert cache2.isEmpty();
}
- @SuppressWarnings("unchecked")
- public void testSyncRepl() {
- Cache<Object, Object>[] caches = cachesTL.get();
+ public void testMultpleCachesOnSharedTransport() {
assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
+ assert cache1.isEmpty();
+ assert cache2.isEmpty();
- caches[0].getConfiguration().setSyncCommitPhase(true);
- caches[1].getConfiguration().setSyncCommitPhase(true);
+ List<CacheManager> managers = getCacheManagers();
+ Configuration newConf = new Configuration();
+ newConf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ defineCacheOnAllManagers("newCache", newConf);
+ Cache altCache1 = managers.get(0).getCache("newCache");
+ Cache altCache2 = managers.get(1).getCache("newCache");
- caches[0].put("age", 38);
- assertEquals("Value should be set", 38, caches[0].get("age"));
- assertEquals("Value should have replicated", 38, caches[1].get("age"));
+ assert altCache1.isEmpty();
+ assert altCache2.isEmpty();
+
+ cache1.put(k, v);
+ assert cache1.get(k).equals(v);
+ assert cache2.get(k).equals(v);
+ assert altCache1.isEmpty();
+ assert altCache2.isEmpty();
+
+ altCache1.put(k, "value2");
+ assert altCache1.get(k).equals("value2");
+ assert altCache2.get(k).equals("value2");
+ assert cache1.get(k).equals(v);
+ assert cache2.get(k).equals(v);
}
- private void assertClusterSize(String message, int size) {
- Cache<Object, Object>[] caches = cachesTL.get();
- for (Cache c : caches) {
- assertClusterSize(message, size, c);
- }
+ public void testReplicateToNonExistentCache() {
+ assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
+ assert cache1.isEmpty();
+ assert cache2.isEmpty();
+
+ List<CacheManager> managers = getCacheManagers();
+ Configuration newConf = new Configuration();
+ newConf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ defineCacheOnAllManagers("newCache", newConf);
+ Cache altCache1 = managers.get(0).getCache("newCache");
+
+ assert altCache1.isEmpty();
+
+ cache1.put(k, v);
+ assert cache1.get(k).equals(v);
+ assert cache2.get(k).equals(v);
+ assert altCache1.isEmpty();
+
+ altCache1.put(k, "value2");
+ assert altCache1.get(k).equals("value2");
+ assert cache1.get(k).equals(v);
+ assert cache2.get(k).equals(v);
+
+ managers.get(0).getCache("newCache").get(k).equals("value2");
}
- private void assertClusterSize(String message, int size, Cache c) {
- assertEquals(message, size, c.getCacheManager().getMembers().size());
+ public void testMixingSyncAndAsyncOnSameTransport() throws Exception {
+ List<CacheManager> managers = getCacheManagers();
+ Transport originalTransport = null;
+ RPCManagerImpl rpcManager = null;
+ try {
+ Configuration asyncCache = new Configuration();
+ asyncCache.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+ defineCacheOnAllManagers("asyncCache", asyncCache);
+ Cache asyncCache1 = managers.get(0).getCache("asyncCache");
+
+ // replace the transport with a mock object
+ Transport mockTransport = createMock(Transport.class);
+ Address mockAddressOne = createNiceMock(Address.class);
+ Address mockAddressTwo = createNiceMock(Address.class);
+ List<Address> addresses = new LinkedList<Address>();
+ addresses.add(mockAddressOne);
+ addresses.add(mockAddressTwo);
+ expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
+ expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
+ replay(mockAddressOne, mockAddressTwo);
+
+ // this is shared by all caches managed by the cache manager
+ originalTransport = TestingUtil.extractComponent(asyncCache1, Transport.class);
+ rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(asyncCache1, RPCManager.class);
+ rpcManager.setTransport(mockTransport);
+
+ expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.SYNCHRONOUS),
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+ .andReturn(Collections.emptyList()).once();
+
+ replay(mockTransport);
+ // check that the replication call was sync
+ cache1.put("k", "v");
+
+ // reset to test for async
+ reset(mockTransport);
+ expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
+ expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
+ expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS),
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+ .andReturn(Collections.emptyList()).once();
+
+ replay(mockTransport);
+ asyncCache1.put("k", "v");
+ // check that the replication call was async
+ verify(mockTransport);
+ } finally {
+ // replace original transport
+ if (rpcManager != null) rpcManager.setTransport(originalTransport);
+ }
}
}
Modified: core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java 2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java 2009-01-28 14:34:08 UTC (rev 7605)
@@ -128,9 +128,9 @@
Class<? extends ReplicableCommand> replicableCommandClass = it.next();
if (realOne.containsCommandType(replicableCommandClass)) {
it.remove();
- } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+ } else if (realOne.getSingleCommand() instanceof PrepareCommand) //explicit transaction
{
- PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
+ PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleCommand();
if (prepareCommand.containsModificationType(replicableCommandClass)) {
it.remove();
}
More information about the jbosscache-commits
mailing list