[jbosscache-commits] JBoss Cache SVN: r7605 - in core/branches/flat/src: main/java/org/horizon/commands and 15 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 28 09:34:08 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-28 09:34:08 -0500 (Wed, 28 Jan 2009)
New Revision: 7605

Added:
   core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
   core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java
   core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
   core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
   core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
   core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
   core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
   core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java
   core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
   core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
   core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
   core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
   core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
   core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
   core/branches/flat/src/test/java/org/horizon/BasicTest.java
   core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
Log:
replication code + tests on shared transport

Modified: core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/cluster/ReplicationQueue.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -22,6 +22,7 @@
 package org.horizon.cluster;
 
 import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
 import org.horizon.commands.remote.ReplicateCommand;
 import org.horizon.config.Configuration;
@@ -60,7 +61,7 @@
    /**
     * Holds the replication jobs: LinkedList<MethodCall>
     */
-   final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
+   final List<RPCCommand> elements = new LinkedList<RPCCommand>();
 
    /**
     * For periodical replication
@@ -128,7 +129,7 @@
    /**
     * Adds a new method call.
     */
-   public void add(ReplicateCommand job) {
+   public void add(RPCCommand job) {
       if (job == null)
          throw new NullPointerException("job is null");
       synchronized (elements) {

Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
  */
 package org.horizon.commands;
 
+import org.horizon.CacheSPI;
 import org.horizon.commands.read.GetKeyValueCommand;
 import org.horizon.commands.read.SizeCommand;
 import org.horizon.commands.remote.ReplicateCommand;
@@ -36,7 +37,6 @@
 import org.horizon.commands.write.ReplaceCommand;
 import org.horizon.container.DataContainer;
 import org.horizon.factories.annotations.Inject;
-import org.horizon.interceptors.InterceptorChain;
 import org.horizon.notifications.CacheNotifier;
 import org.horizon.remoting.transport.Address;
 import org.horizon.transaction.GlobalTransaction;
@@ -51,13 +51,16 @@
 public class CommandsFactoryImpl implements CommandsFactory {
    private DataContainer dataContainer;
    private CacheNotifier notifier;
-   private InterceptorChain interceptorChain;
+   private CacheSPI cache;
 
+   // some stateless commands can be reused so that they aren't constructed again all the time.
+   SizeCommand cachedSizeCommand;
+
    @Inject
-   public void setupDependencies(DataContainer container, CacheNotifier notifier, InterceptorChain interceptorChain) {
+   public void setupDependencies(DataContainer container, CacheNotifier notifier, CacheSPI cache) {
       this.dataContainer = container;
       this.notifier = notifier;
-      this.interceptorChain = interceptorChain;
+      this.cache = cache;
    }
 
    public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value) {
@@ -73,7 +76,10 @@
    }
 
    public SizeCommand buildSizeCommand() {
-      return new SizeCommand(dataContainer);
+      if (cachedSizeCommand == null) {
+         cachedSizeCommand = new SizeCommand(dataContainer);
+      }
+      return cachedSizeCommand;
    }
 
    public GetKeyValueCommand buildGetKeyValueCommand(Object key) {
@@ -107,11 +113,11 @@
    }
 
    public ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate) {
-      return new ReplicateCommand(toReplicate);
+      return new ReplicateCommand(toReplicate, cache.getName());
    }
 
    public ReplicateCommand buildReplicateCommand(ReplicableCommand call) {
-      return new ReplicateCommand(call);
+      return new ReplicateCommand(call, cache.getName());
    }
 
    public void initializeReplicableCommand(ReplicableCommand c) {
@@ -126,13 +132,10 @@
          case RemoveCommand.METHOD_ID:
             ((RemoveCommand) c).init(notifier);
             break;
-         case ReplicateCommand.MULTIPLE_METHOD_ID:
-         case ReplicateCommand.SINGLE_METHOD_ID:
+         case ReplicateCommand.METHOD_ID:
             ReplicateCommand rc = (ReplicateCommand) c;
-            if (rc.getModifications() != null)
-               for (ReplicableCommand nested : rc.getModifications()) initializeReplicableCommand(nested);
-            initializeReplicableCommand(rc.getSingleModification());
-            rc.initialize(interceptorChain);
+            if (rc.getCommands() != null)
+               for (ReplicableCommand nested : rc.getCommands()) initializeReplicableCommand(nested);
             break;
          case PrepareCommand.METHOD_ID:
             PrepareCommand pc = (PrepareCommand) c;

Added: core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -0,0 +1,67 @@
+package org.horizon.commands;
+
+import org.horizon.interceptors.InterceptorChain;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * The RPCManager only replicates commands wrapped in an RPCCommand.  As a wrapper, an RPCCommand could contain a single
+ * {@link org.horizon.commands.ReplicableCommand} or a List of them.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface RPCCommand extends ReplicableCommand {
+
+   /**
+    * @return true if this only wraps a single ReplicableCommand.  False if it wraps more than one.
+    */
+   boolean isSingleCommand();
+
+   /**
+    * A convenience method if there is only a single command being transported, i.e., if {@link #isSingleCommand()} is
+    * true.  If {@link #isSingleCommand()} is false, this method throws a {@link IllegalStateException} so it should
+    * only be used after testing {@link #isSingleCommand()}.
+    *
+    * @return a single ReplicableCommand.
+    */
+   ReplicableCommand getSingleCommand();
+
+   /**
+    * A more generic mechanism to get a hold of the commands wrapped.  Even if {@link #isSingleCommand()} is true, this
+    * command returns a valid and usable List.
+    *
+    * @return a list of all commands.
+    */
+   List<ReplicableCommand> getCommands();
+
+   /**
+    * Adds a single command to the list of commands being wrapped
+    *
+    * @param command command to add
+    */
+   void addCommand(ReplicableCommand command);
+
+   /**
+    * Adds a collection of commands to the list of commands being wrapped
+    *
+    * @param commands commands to add
+    */
+   void addCommands(Collection<? extends ReplicableCommand> commands);
+
+   /**
+    * @return the name of the cache that produced this command.  This will also be the name of the cache this command is
+    *         intended for.
+    */
+   String getCacheName();
+
+   void setCacheName(String name);
+
+   /**
+    * Sets the interceptor chain on which to invoke the command.
+    *
+    * @param interceptorChain chain to invoke command on
+    */
+   void setInterceptorChain(InterceptorChain interceptorChain);
+}

Modified: core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -61,8 +61,7 @@
          case RollbackCommand.METHOD_ID:
             command = new RollbackCommand();
             break;
-         case ReplicateCommand.MULTIPLE_METHOD_ID:
-         case ReplicateCommand.SINGLE_METHOD_ID:
+         case ReplicateCommand.METHOD_ID:
             command = new ReplicateCommand();
             break;
 

Modified: core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/ReplicableCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -25,7 +25,7 @@
 
 /**
  * The core of the command-based cache framework.  Commands correspond to specific areas of functionality in the cache,
- * and can be replicated using the {@link org.horizon.marshall.Marshaller} framework.
+ * and can be replicated using the {@link org.horizon.remoting.RPCManager}
  *
  * @author Mircea.Markus at jboss.com
  * @author Manik Surtani

Modified: core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/read/SizeCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -27,7 +27,7 @@
 import org.horizon.context.InvocationContext;
 
 /**
- * // TODO: MANIK: Document this
+ * Command to calculate the size of the cache
  *
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 1.0
@@ -58,4 +58,11 @@
    public void setParameters(int commandId, Object[] parameters) {
       // no-op
    }
+
+   @Override
+   public String toString() {
+      return "SizeCommand{" +
+            "containerSize=" + container.size() +
+            '}';
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
  */
 package org.horizon.commands.remote;
 
+import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
 import org.horizon.commands.VisitableCommand;
 import org.horizon.context.InvocationContext;
@@ -29,11 +30,12 @@
 import org.horizon.logging.LogFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 /**
- * Command that implements cluster replication logic.  Essentially mimics the replicate() and replicateAll() methods in
- * 2.1.x, we may need to revisit the usefulness of such a command.
+ * Command that implements cluster replication logic.
  * <p/>
  * This is not a {@link VisitableCommand} and hence not passed up the {@link org.horizon.interceptors.base.CommandInterceptor}
  * chain.
@@ -42,52 +44,38 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class ReplicateCommand implements ReplicableCommand {
-   public static final byte SINGLE_METHOD_ID = 13;
-   public static final byte MULTIPLE_METHOD_ID = 14;
+public class ReplicateCommand implements RPCCommand {
+   public static final byte METHOD_ID = 13;
 
-   private InterceptorChain invoker;
+   private InterceptorChain interceptorChain;
 
    private static final Log log = LogFactory.getLog(ReplicateCommand.class);
    private static final boolean trace = log.isTraceEnabled();
 
-   /**
-    * optimisation - rather than constructing a new list each for scenarios where a single modification needs to be
-    * replicated rather use this instance.
-    */
-   private ReplicableCommand singleModification;
-   private List<ReplicableCommand> modifications;
+   private List<ReplicableCommand> commands;
+   private String cacheName;
 
-   public ReplicateCommand(List<ReplicableCommand> modifications) {
+   public ReplicateCommand(List<ReplicableCommand> modifications, String cacheName) {
       if (modifications != null && modifications.size() == 1) {
-         singleModification = modifications.get(0);
+         this.commands = Collections.singletonList(modifications.get(0));
       } else {
-         this.modifications = modifications;
+         this.commands = modifications;
       }
+      this.cacheName = cacheName;
    }
 
-   public ReplicateCommand(ReplicableCommand command) {
-      this.singleModification = command;
+   public ReplicateCommand(ReplicableCommand command, String cacheName) {
+      commands = Collections.singletonList(command);
+      this.cacheName = cacheName;
    }
 
    public ReplicateCommand() {
    }
 
-   public void initialize(InterceptorChain interceptorChain) {
-      this.invoker = interceptorChain;
+   public void setInterceptorChain(InterceptorChain interceptorChain) {
+      this.interceptorChain = interceptorChain;
    }
 
-   public void setSingleModification(ReplicableCommand singleModification) {
-      this.singleModification = singleModification;
-   }
-
-   public void setModifications(List<ReplicableCommand> modifications) {
-      if (modifications != null && modifications.size() == 1)
-         singleModification = modifications.get(0);
-      else
-         this.modifications = modifications;
-   }
-
    /**
     * Executes commands replicated to the current cache instance by other cache instances.
     *
@@ -97,21 +85,21 @@
     */
    public Object perform(InvocationContext ctx) throws Throwable {
       if (isSingleCommand()) {
-         return processSingleCommand(singleModification);
+         return processCommand(ctx, commands.get(0));
       } else {
-         for (ReplicableCommand command : modifications) processSingleCommand(command);
+         for (ReplicableCommand command : commands) processCommand(ctx, command);
          return null;
       }
    }
 
-   private Object processSingleCommand(ReplicableCommand cacheCommand)
+   private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand)
          throws Throwable {
       Object result;
       try {
          if (trace) log.trace("Invoking command " + cacheCommand + ", with originLocal flag set to false.");
-
+         ctx.setOriginLocal(false);
          if (cacheCommand instanceof VisitableCommand) {
-            Object retVal = invoker.invokeRemote((VisitableCommand) cacheCommand);
+            Object retVal = interceptorChain.invokeRemote((VisitableCommand) cacheCommand);
             // we only need to return values for a set of remote calls; not every call.
             if (returnValueForRemoteCall(cacheCommand)) {
                result = retVal;
@@ -119,7 +107,8 @@
                result = null;
             }
          } else {
-            result = cacheCommand.perform(null);
+            throw new RuntimeException("Do we still need to deal with non-visitable commands?");
+//            result = cacheCommand.perform(null);
          }
       }
       catch (Throwable ex) {
@@ -143,35 +132,57 @@
    }
 
    public byte getCommandId() {
-      return isSingleCommand() ? SINGLE_METHOD_ID : MULTIPLE_METHOD_ID;
+      return METHOD_ID;
    }
 
-   public List<ReplicableCommand> getModifications() {
-      return modifications;
+   public List<ReplicableCommand> getCommands() {
+      return commands;
    }
 
-   public ReplicableCommand getSingleModification() {
-      return singleModification;
+   public void addCommand(ReplicableCommand command) {
+      if (commands == null) {
+         commands = Collections.singletonList(command);
+      } else {
+         upgradeCommandsListIfNeeded();
+         commands.add(command);
+      }
    }
 
+   public void addCommands(Collection<? extends ReplicableCommand> commands) {
+      upgradeCommandsListIfNeeded();
+      this.commands.addAll(commands);
+   }
+
+   private void upgradeCommandsListIfNeeded() {
+      if (!(commands instanceof ArrayList)) {
+         commands = new ArrayList<ReplicableCommand>(commands);
+      }
+   }
+
+   public String getCacheName() {
+      return cacheName;
+   }
+
+   public void setCacheName(String name) {
+      this.cacheName = cacheName;
+   }
+
+   public ReplicableCommand getSingleCommand() {
+      return commands.get(0);
+   }
+
    public Object[] getParameters() {
-      if (isSingleCommand())
-         return new Object[]{singleModification};
-      else
-         return new Object[]{modifications};
+      return new Object[]{cacheName, commands};
    }
 
    @SuppressWarnings("unchecked")
    public void setParameters(int commandId, Object[] args) {
-      if (commandId == SINGLE_METHOD_ID) {
-         singleModification = (ReplicableCommand) args[0];
-      } else {
-         modifications = (List<ReplicableCommand>) args[0];
-      }
+      cacheName = (String) args[0];
+      commands = (List<ReplicableCommand>) args[1];
    }
 
    public boolean isSingleCommand() {
-      return singleModification != null;
+      return commands != null && commands.size() == 1;
    }
 
    @Override
@@ -181,28 +192,14 @@
 
       ReplicateCommand that = (ReplicateCommand) o;
 
-      if (modifications != null ? !modifications.equals(that.modifications) : that.modifications != null) return false;
-      if (singleModification != null ? !singleModification.equals(that.singleModification) : that.singleModification != null)
-         return false;
-
-      return true;
+      return !(commands != null ? !commands.equals(that.commands) : that.commands != null);
    }
 
    @Override
    public int hashCode() {
-      int result;
-      result = (singleModification != null ? singleModification.hashCode() : 0);
-      result = 31 * result + (modifications != null ? modifications.hashCode() : 0);
-      return result;
+      return commands != null ? commands.hashCode() : 0;
    }
 
-   @Override
-   public String toString() {
-      return "ReplicateCommand{" +
-            "cmds=" + (isSingleCommand() ? singleModification : modifications) +
-            '}';
-   }
-
    /**
     * Creates a copy of this command, amking a deep copy of any collections but everything else copied shallow.
     *
@@ -211,20 +208,32 @@
    public ReplicateCommand copy() {
       ReplicateCommand clone;
       clone = new ReplicateCommand();
-      clone.invoker = invoker;
-      clone.modifications = modifications == null ? null : new ArrayList<ReplicableCommand>(modifications);
-      clone.singleModification = singleModification;
+      clone.interceptorChain = interceptorChain;
+      if (commands != null) {
+         if (commands.size() == 1)
+            clone.commands = Collections.singletonList(commands.get(0));
+         else
+            clone.commands = new ArrayList<ReplicableCommand>(commands);
+      }
       return clone;
    }
 
    public boolean containsCommandType(Class<? extends ReplicableCommand> aClass) {
-      if (isSingleCommand()) {
-         return getSingleModification().getClass().equals(aClass);
+      if (commands.size() == 1) {
+         return commands.get(0).getClass().equals(aClass);
       } else {
-         for (ReplicableCommand command : getModifications()) {
+         for (ReplicableCommand command : getCommands()) {
             if (command.getClass().equals(aClass)) return true;
          }
       }
       return false;
    }
+
+   @Override
+   public String toString() {
+      return "ReplicateCommand{" +
+            "commands=" + commands +
+            ", cacheName='" + cacheName + '\'' +
+            '}';
+   }
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -71,7 +71,9 @@
    public Object perform(InvocationContext ctx) throws Throwable {
       Object o = null;
       MVCCEntry e = ctx.lookupEntry(key);
-      if (e.getValue() == null || !putIfAbsent) {
+      if (e.getValue() != null && putIfAbsent) {
+         return e.getValue();
+      } else {
          notifier.notifyCacheEntryModified(key, true, ctx);
 
          if (value instanceof Delta) {

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -57,13 +57,15 @@
 
    public Object perform(InvocationContext ctx) throws Throwable {
       MVCCEntry e = ctx.lookupEntry(key);
-      if (e == null || e.isNullEntry()) return null;
+      if (e == null || e.isNullEntry()) return value == null ? null : false;
+      if (value != null && e.getValue() != null && !e.getValue().equals(value))
+         return false;
+
       notifier.notifyCacheEntryRemoved(key, true, ctx);
       e.setDeleted(true);
       e.setValid(false);
       notifier.notifyCacheEntryRemoved(key, false, ctx);
-      return e.getValue();
-
+      return value == null ? e.getValue() : true;
    }
 
    public byte getCommandId() {

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -50,14 +50,14 @@
       return visitor.visitReplaceCommand(ctx, this);
    }
 
-   public Boolean perform(InvocationContext ctx) throws Throwable {
+   public Object perform(InvocationContext ctx) throws Throwable {
       MVCCEntry e = ctx.lookupEntry(key);
-      if (e == null || e.isNullEntry()) return false;
+      if (e == null || e.isNullEntry()) return oldValue == null ? null : false;
       if (oldValue == null || oldValue.equals(e.getValue())) {
-         e.setValue(newValue);
-         return true;
+         Object old = e.setValue(newValue);
+         return oldValue == null ? old : true;
       }
-      return false;
+      return oldValue == null ? null : false;
    }
 
    public byte getCommandId() {

Modified: core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -59,7 +59,9 @@
    }
 
    public Object setValue(Object value) {
-      return this.value = value;
+      Object oldValue = this.value;
+      this.value = value;
+      return oldValue;
    }
 
    protected static enum Flags {

Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,33 +21,47 @@
  */
 package org.horizon.container;
 
+import java.util.AbstractSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * // TODO: crappy and inefficient - but just a placeholder for now.
+ * The basic container.  Accepts null keys.
  *
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 1.0
  */
 public class UnsortedDataContainer<K, V> implements DataContainer<K, V> {
-   private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
+   private final ConcurrentMap<Object, Object> data = new ConcurrentHashMap<Object, Object>();
+   private static final Object NULL = new Object();
 
+   @SuppressWarnings("unchecked")
+   private Object maskNull(Object o) {
+      return o == null ? (K) NULL : (K) o;
+   }
+
+   private Object unmaskNull(Object o) {
+      return (o == NULL) ? null : o;
+   }
+
+   @SuppressWarnings("unchecked")
    public V get(K k) {
-      return data.get(k);
+      return (V) unmaskNull(data.get(maskNull(k)));
    }
 
    public void put(K k, V v) {
-      data.put(k, v);
+      data.put(maskNull(k), maskNull(v));
    }
 
    public boolean containsKey(K k) {
-      return data.containsKey(k);
+      return data.containsKey(maskNull(k));
    }
 
+   @SuppressWarnings("unchecked")
    public V remove(K k) {
-      return data.remove(k);
+      return (V) unmaskNull(data.remove(maskNull(k)));
    }
 
    public int size() {
@@ -59,7 +73,7 @@
    }
 
    public Set<K> keySet() {
-      return data.keySet();
+      return new KeySet();
    }
 
    public boolean evict(Object key) {
@@ -69,4 +83,53 @@
    public String toString() {
       return data.toString();
    }
+
+   private class KeySet extends AbstractSet<K> {
+      Set<Object> realSet;
+
+      public KeySet() {
+         this.realSet = data.keySet();
+      }
+
+      public Iterator<K> iterator() {
+         return new KeyIterator(realSet.iterator());
+      }
+
+      public void clear() {
+         throw new UnsupportedOperationException();
+      }
+
+      public boolean contains(Object o) {
+         return realSet.contains(maskNull(o));
+      }
+
+      public boolean remove(Object o) {
+         throw new UnsupportedOperationException();
+      }
+
+      public int size() {
+         return realSet.size();
+      }
+   }
+
+   private class KeyIterator implements Iterator<K> {
+      Iterator<Object> realIterator;
+
+      private KeyIterator(Iterator<Object> realIterator) {
+         this.realIterator = realIterator;
+      }
+
+      public boolean hasNext() {
+         return realIterator.hasNext();
+      }
+
+      @SuppressWarnings("unchecked")
+      public K next() {
+         return (K) unmaskNull(realIterator.next());
+      }
+
+      public void remove() {
+         throw new UnsupportedOperationException();
+      }
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -13,7 +13,7 @@
 import java.util.Map;
 
 /**
- * // TODO: Manik: Document this!
+ * Named cache specific components
  *
  * @author Manik Surtani
  * @since 1.0
@@ -43,8 +43,6 @@
          registerComponent(this, ComponentRegistry.class);
          registerComponent(configuration, Configuration.class);
          registerComponent(new BootstrapFactory(cache, configuration, this), BootstrapFactory.class);
-
-         globalComponents.registerNamedComponentRegistry(this, cacheName);
       }
       catch (Exception e) {
          throw new CacheException("Unable to construct a ComponentRegistry!", e);
@@ -103,5 +101,12 @@
          globalComponents.start();
       }
       super.start();
+      globalComponents.registerNamedComponentRegistry(this, cacheName);
    }
+
+   @Override
+   public void stop() {
+      if (state.stopAllowed()) globalComponents.unregisterNamedComponentRegistry(cacheName);
+      super.stop();
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorFactory.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,19 +1,23 @@
 package org.horizon.factories;
 
+import org.horizon.commands.RemoteCommandFactory;
 import org.horizon.config.ConfigurationException;
 import org.horizon.factories.annotations.DefaultFactoryFor;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
 import org.horizon.marshall.Marshaller;
 import org.horizon.marshall.VersionAwareMarshaller;
 import org.horizon.notifications.CacheManagerNotifier;
 import org.horizon.remoting.InboundInvocationHandler;
 
 /**
- * // TODO: Manik: Document this!
+ * Factory for building global-scope components which have default empty constructors
  *
  * @author Manik Surtani
  * @since 1.0
  */
- at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, Marshaller.class})
+ at DefaultFactoryFor(classes = {InboundInvocationHandler.class, CacheManagerNotifier.class, Marshaller.class, RemoteCommandFactory.class})
+ at Scope(Scopes.GLOBAL)
 public class EmptyConstructorFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
    public <T> T construct(Class<T> componentType) {
       try {

Modified: core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -15,7 +15,7 @@
 import java.util.ArrayList;
 
 /**
- * // TODO: Manik: Document this!
+ * A global component registry where shared components are stored.
  *
  * @author Manik Surtani
  * @since 1.0
@@ -25,6 +25,7 @@
 public class GlobalComponentRegistry extends AbstractComponentRegistry {
 
    private Log log = LogFactory.getLog(GlobalComponentRegistry.class);
+   private static final String NAMED_REGISTRY_PREFIX = "NamedComponentRegistry:";
    /**
     * Hook to shut down the cache when the JVM exits.
     */
@@ -96,10 +97,14 @@
    }
 
    public ComponentRegistry getNamedComponentRegistry(String name) {
-      return getComponent(ComponentRegistry.class, name);
+      return getComponent(ComponentRegistry.class, NAMED_REGISTRY_PREFIX + name);
    }
 
    public void registerNamedComponentRegistry(ComponentRegistry componentRegistry, String name) {
-      registerComponent(componentRegistry, name);
+      registerComponent(componentRegistry, NAMED_REGISTRY_PREFIX + name);
    }
+
+   public void unregisterNamedComponentRegistry(String name) {
+      componentLookup.remove(NAMED_REGISTRY_PREFIX + name);
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -134,7 +134,7 @@
          if (tx == null || !TransactionTable.isValid(tx)) {
             // the no-tx case:
             //replicate an evict call.
-            invalidateAcrossCluster(key, null, isSynchronous(optionOverride), ctx);
+            invalidateAcrossCluster(key, isSynchronous(optionOverride), ctx);
          } else {
             if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification(command);
          }
@@ -152,7 +152,7 @@
             log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
          } else {
             try {
-               for (Object key : filterVisitor.result) invalidateAcrossCluster(key, null, defaultSynchronous, ctx);
+               for (Object key : filterVisitor.result) invalidateAcrossCluster(key, defaultSynchronous, ctx);
             }
             catch (Throwable t) {
                log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
@@ -193,7 +193,7 @@
    }
 
 
-   protected void invalidateAcrossCluster(Object fqn, Object workspace, boolean synchronous, InvocationContext ctx) throws Throwable {
+   protected void invalidateAcrossCluster(Object fqn, boolean synchronous, InvocationContext ctx) throws Throwable {
       if (!isLocalModeForced(ctx)) {
          // increment invalidations counter if statistics maintained
          incrementInvalidations();

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -23,6 +23,7 @@
 
 import org.horizon.cluster.ReplicationQueue;
 import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
 import org.horizon.config.Option;
 import org.horizon.context.InvocationContext;
@@ -82,15 +83,23 @@
       }
    }
 
-   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+   protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
       replicateCall(ctx, null, call, sync, o, useOutOfBandMessage);
    }
 
-   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable {
+   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+      replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, o, useOutOfBandMessage);
+   }
+
+   protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync, Option o) throws Throwable {
       replicateCall(ctx, null, call, sync, o, false);
    }
 
-   protected void replicateCall(InvocationContext ctx, List<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
+   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable {
+      replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, o, false);
+   }
+
+   protected void replicateCall(InvocationContext ctx, List<Address> recipients, RPCCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable {
       long syncReplTimeout = configuration.getSyncReplTimeout();
 
       // test for option overrides
@@ -111,15 +120,15 @@
          }
       }
 
-      replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
+      replicateCall(recipients, c, sync, useOutOfBandMessage, syncReplTimeout);
    }
 
-   protected void replicateCall(List<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable {
+   protected void replicateCall(List<Address> recipients, RPCCommand call, boolean sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
       if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
 
       if (!sync && replicationQueue != null) {
          if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
-         replicationQueue.add(commandsFactory.buildReplicateCommand(call));
+         replicationQueue.add(call);
       } else {
          List<Address> callRecipients = recipients;
          if (callRecipients == null) {
@@ -128,10 +137,8 @@
                log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
          }
 
-         ReplicableCommand toCall = wrapCacheCommandInReplicateMethod ? commandsFactory.buildReplicateCommand(call) : call;
-
          List rsps = rpcManager.invokeRemotely(callRecipients,
-                                               toCall,
+                                               call,
                                                sync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS, // is synchronised?
                                                timeout,
                                                useOutOfBandMessage

Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -41,7 +41,6 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -265,7 +264,7 @@
          throw new NullPointerException("Null arguments not allowed");
       if (cacheName.equals(DEFAULT_CACHE_NAME))
          throw new IllegalArgumentException("Cache name cannot be used as it is a reserved, internal name");
-      if (configurationOverrides.putIfAbsent(cacheName, configurationOverride) != null)
+      if (configurationOverrides.putIfAbsent(cacheName, configurationOverride.clone()) != null)
          throw new DuplicateCacheNameException("Cache name [" + cacheName + "] already in use!");
    }
 
@@ -307,11 +306,9 @@
       return globalConfiguration.getClusterName();
    }
 
-   @SuppressWarnings("unchecked")
    public List<Address> getMembers() {
       RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
-      List l = rpcManager == null ? Collections.emptyList() : rpcManager.getMembers();
-      return l;
+      return rpcManager == null ? null : rpcManager.getMembers();
    }
 
    public Address getAddress() {

Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -38,7 +38,6 @@
 import org.jboss.util.stream.MarshalledValueInputStream;
 
 import java.io.ByteArrayInputStream;
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -55,38 +54,32 @@
  */
 public class HorizonMarshaller implements Marshaller {
    // magic numbers
-   protected static final int MAGICNUMBER_METHODCALL = 1;
-   protected static final int MAGICNUMBER_FQN = 2;
-   protected static final int MAGICNUMBER_GTX = 3;
-   protected static final int MAGICNUMBER_JG_ADDRESS = 4;
-   protected static final int MAGICNUMBER_ARRAY_LIST = 5;
-   protected static final int MAGICNUMBER_INTEGER = 6;
-   protected static final int MAGICNUMBER_LONG = 7;
-   protected static final int MAGICNUMBER_BOOLEAN = 8;
-   protected static final int MAGICNUMBER_STRING = 9;
-   protected static final int MAGICNUMBER_DEFAULT_DATA_VERSION = 10;
-   protected static final int MAGICNUMBER_LINKED_LIST = 11;
-   protected static final int MAGICNUMBER_HASH_MAP = 12;
-   protected static final int MAGICNUMBER_TREE_MAP = 13;
-   protected static final int MAGICNUMBER_HASH_SET = 14;
-   protected static final int MAGICNUMBER_TREE_SET = 15;
-   protected static final int MAGICNUMBER_NODEDATA_MARKER = 16;
-   protected static final int MAGICNUMBER_NODEDATA_EXCEPTION_MARKER = 17;
-   protected static final int MAGICNUMBER_NODEDATA = 18;
-   protected static final int MAGICNUMBER_GRAVITATERESULT = 19;
-   protected static final int MAGICNUMBER_SHORT = 20;
-   protected static final int MAGICNUMBER_IMMUTABLE_MAPCOPY = 21;
-   protected static final int MAGICNUMBER_MARSHALLEDVALUE = 22;
-   protected static final int MAGICNUMBER_FASTCOPY_HASHMAP = 23;
-   protected static final int MAGICNUMBER_ARRAY = 24;
-   protected static final int MAGICNUMBER_BYTE = 25;
-   protected static final int MAGICNUMBER_CHAR = 26;
-   protected static final int MAGICNUMBER_FLOAT = 27;
-   protected static final int MAGICNUMBER_DOUBLE = 28;
-   protected static final int MAGICNUMBER_OBJECT = 29;
+   protected static final int MAGICNUMBER_GTX = 1;
+   protected static final int MAGICNUMBER_JG_ADDRESS = 2;
+   protected static final int MAGICNUMBER_ARRAY_LIST = 3;
+   protected static final int MAGICNUMBER_INTEGER = 4;
+   protected static final int MAGICNUMBER_LONG = 5;
+   protected static final int MAGICNUMBER_BOOLEAN = 6;
+   protected static final int MAGICNUMBER_STRING = 7;
+   protected static final int MAGICNUMBER_LINKED_LIST = 8;
+   protected static final int MAGICNUMBER_HASH_MAP = 9;
+   protected static final int MAGICNUMBER_TREE_MAP = 10;
+   protected static final int MAGICNUMBER_HASH_SET = 11;
+   protected static final int MAGICNUMBER_TREE_SET = 12;
+   protected static final int MAGICNUMBER_SHORT = 13;
+   protected static final int MAGICNUMBER_IMMUTABLE_MAPCOPY = 14;
+   protected static final int MAGICNUMBER_MARSHALLEDVALUE = 15;
+   protected static final int MAGICNUMBER_FASTCOPY_HASHMAP = 16;
+   protected static final int MAGICNUMBER_ARRAY = 17;
+   protected static final int MAGICNUMBER_BYTE = 18;
+   protected static final int MAGICNUMBER_CHAR = 19;
+   protected static final int MAGICNUMBER_FLOAT = 20;
+   protected static final int MAGICNUMBER_DOUBLE = 21;
+   protected static final int MAGICNUMBER_OBJECT = 22;
+   protected static final int MAGICNUMBER_SINGLETON_LIST = 23;
+   protected static final int MAGICNUMBER_COMMAND = 24;
    protected static final int MAGICNUMBER_NULL = 99;
    protected static final int MAGICNUMBER_SERIALIZABLE = 100;
-
    protected static final int MAGICNUMBER_REF = 101;
 
    public HorizonMarshaller() {
@@ -101,8 +94,9 @@
    protected ClassLoader defaultClassLoader;
    protected boolean useRefs = false;
 
-   public void init(ClassLoader defaultClassLoader) {
+   public void init(ClassLoader defaultClassLoader, RemoteCommandFactory remoteCommandFactory) {
       this.defaultClassLoader = defaultClassLoader;
+      this.remoteCommandFactory = remoteCommandFactory;
    }
 
    protected void initLogger() {
@@ -132,10 +126,10 @@
             ReplicableCommand command = (ReplicableCommand) o;
 
             if (command.getCommandId() > -1) {
-               out.writeByte(MAGICNUMBER_METHODCALL);
+               out.writeByte(MAGICNUMBER_COMMAND);
                marshallCommand(command, out, refMap);
             } else {
-               throw new IllegalArgumentException("MethodCall does not have a valid method id.  Was this method call created with MethodCallFactory?");
+               throw new IllegalArgumentException("Command does not have a valid method id!");
             }
          } else if (o instanceof MarshalledValue) {
             out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
@@ -158,6 +152,9 @@
          } else if (o instanceof LinkedList) {
             out.writeByte(MAGICNUMBER_LINKED_LIST);
             marshallCollection((Collection) o, out, refMap);
+         } else if (o.getClass().getName().equals("java.util.Collections$SingletonList")) {
+            out.writeByte(MAGICNUMBER_SINGLETON_LIST);
+            marshallObject(((List) o).get(0), out, refMap);
          } else if (o.getClass().equals(HashMap.class)) {
             out.writeByte(MAGICNUMBER_HASH_MAP);
             marshallMap((Map) o, out, refMap);
@@ -192,15 +189,6 @@
             out.writeByte(MAGICNUMBER_STRING);
             if (useRefs) writeReference(out, createReference(o, refMap));
             marshallString((String) o, out);
-         } else if (o instanceof NodeDataMarker) {
-            out.writeByte(MAGICNUMBER_NODEDATA_MARKER);
-            ((Externalizable) o).writeExternal(out);
-         } else if (o instanceof NodeDataExceptionMarker) {
-            out.writeByte(MAGICNUMBER_NODEDATA_EXCEPTION_MARKER);
-            ((Externalizable) o).writeExternal(out);
-         } else if (o instanceof NodeData) {
-            out.writeByte(MAGICNUMBER_NODEDATA);
-            ((Externalizable) o).writeExternal(out);
          } else if (o instanceof Serializable) {
             if (trace) {
                log.trace("Warning: using object serialization for " + o.getClass());
@@ -306,7 +294,7 @@
             MarshalledValue mv = new MarshalledValue();
             mv.readExternal(in);
             return mv;
-         case MAGICNUMBER_METHODCALL:
+         case MAGICNUMBER_COMMAND:
             retVal = unmarshallCommand(in, refMap);
             return retVal;
          case MAGICNUMBER_GTX:
@@ -323,6 +311,8 @@
             return unmarshallArrayList(in, refMap);
          case MAGICNUMBER_LINKED_LIST:
             return unmarshallLinkedList(in, refMap);
+         case MAGICNUMBER_SINGLETON_LIST:
+            return unmarshallSingletonList(in, refMap);
          case MAGICNUMBER_HASH_MAP:
             return unmarshallHashMap(in, refMap);
          case MAGICNUMBER_TREE_MAP:
@@ -348,18 +338,6 @@
             retVal = unmarshallString(in);
             if (useRefs) refMap.putReferencedObject(reference, retVal);
             return retVal;
-         case MAGICNUMBER_NODEDATA_MARKER:
-            retVal = new NodeDataMarker();
-            ((NodeDataMarker) retVal).readExternal(in);
-            return retVal;
-         case MAGICNUMBER_NODEDATA_EXCEPTION_MARKER:
-            retVal = new NodeDataExceptionMarker();
-            ((NodeDataExceptionMarker) retVal).readExternal(in);
-            return retVal;
-         case MAGICNUMBER_NODEDATA:
-            retVal = new NodeData();
-            ((NodeData) retVal).readExternal(in);
-            return retVal;
          default:
             if (log.isErrorEnabled()) {
                log.error("Unknown Magic Number " + magicNumber);
@@ -421,6 +399,10 @@
       return list;
    }
 
+   private List unmarshallSingletonList(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception {
+      return Collections.singletonList(unmarshallObject(in, refMap));
+   }
+
    private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception {
       Map map = new HashMap();
       populateFromStream(in, refMap, map);

Modified: core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,6 +21,7 @@
  */
 package org.horizon.marshall;
 
+import org.horizon.commands.RemoteCommandFactory;
 import org.horizon.factories.annotations.Inject;
 import org.horizon.io.ByteBuffer;
 import org.horizon.io.ExposedByteArrayOutputStream;
@@ -54,9 +55,9 @@
    ClassLoader defaultClassLoader;
 
    @Inject
-   public void init(ClassLoader loader) {
+   public void init(ClassLoader loader, RemoteCommandFactory remoteCommandFactory) {
       defaultMarshaller = new HorizonMarshaller();
-      defaultMarshaller.init(loader);
+      defaultMarshaller.init(loader, remoteCommandFactory);
    }
 
    protected int getCustomMarshallerVersionInt() {

Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,6 +1,6 @@
 package org.horizon.remoting;
 
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
 import org.horizon.factories.scopes.Scope;
 import org.horizon.factories.scopes.Scopes;
 
@@ -20,5 +20,5 @@
     * @param command command to invoke
     * @return results, if any, from the invocation
     */
-   Object handle(ReplicableCommand command);
+   Object handle(RPCCommand command) throws Throwable;
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,48 +1,49 @@
 package org.horizon.remoting;
 
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.RPCCommand;
 import org.horizon.factories.ComponentRegistry;
 import org.horizon.factories.GlobalComponentRegistry;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
 import org.horizon.interceptors.InterceptorChain;
 import org.horizon.invocation.InvocationContextContainer;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
 
 /**
- * // TODO: Manik: Document this!
+ * Sets the cache interceptor chain on an RPCCommand before calling it to perform
  *
  * @author Manik Surtani
  * @since 1.0
  */
+ at NonVolatile
+ at Scope(Scopes.GLOBAL)
 public class InboundInvocationHandlerImpl implements InboundInvocationHandler {
-   InvocationContextContainer invocationContextContainer;
-   ComponentRegistry componentRegistry;
-   InterceptorChain interceptorChain;
    GlobalComponentRegistry gcr;
+   private static final Log log = LogFactory.getLog(InboundInvocationHandlerImpl.class);
 
+   @Inject
    public void inject(GlobalComponentRegistry gcr) {
       this.gcr = gcr;
    }
 
-   private ComponentRegistry getNamedCacheComponentRegistry(String name) {
-      return gcr.getNamedComponentRegistry(name);
-   }
+   public Object handle(RPCCommand cmd) throws Throwable {
+      String cacheName = cmd.getCacheName();
+      ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
+      if (cr == null) {
+         log.info("Cache named {0} does not exist on this cache manager!", cacheName);
+         return null;
+      }
+      InterceptorChain ic = cr.getComponent(InterceptorChain.class);
+      InvocationContextContainer icc = cr.getComponent(InvocationContextContainer.class);
+      CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
 
-   public Object handle(ReplicableCommand command) {
-
-      throw new RuntimeException("Implement me!");
-//      if (cmd instanceof VisitableCommand) {
-//         InvocationContext ctx = invocationContextContainer.get();
-//         ctx.setOriginLocal(false);
-//         if (!componentRegistry.invocationsAllowed(false)) {
-//            return null;
-//         }
-//         return interceptorChain.invoke(ctx, (VisitableCommand) command);
-//      } else {
-//         if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
-//
-//         // need to check cache status for all except buddy replication commands.
-//         if (!componentRegistry.invocationsAllowed(false)) return null;
-//
-//         return cmd.perform(null);
-//      }
+      cmd.setInterceptorChain(ic);
+      // initialize this command with components specific to the intended cache instance
+      commandsFactory.initializeReplicableCommand(cmd);
+      return cmd.perform(icc.get());
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -21,7 +21,7 @@
  */
 package org.horizon.remoting;
 
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
 import org.horizon.factories.annotations.NonVolatile;
 import org.horizon.factories.scopes.Scope;
 import org.horizon.factories.scopes.Scopes;
@@ -49,7 +49,7 @@
     *
     * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
     *                         entire cluster.
-    * @param cacheCommand     the cache command to invoke
+    * @param rpcCommand       the cache command to invoke
     * @param mode             the response mode to use
     * @param timeout          a timeout after which to throw a replication exception.
     * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
@@ -58,14 +58,14 @@
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
     * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
     *                         entire cluster.
-    * @param cacheCommand     the cache command to invoke
+    * @param rpcCommand       the cache command to invoke
     * @param mode             the response mode to use
     * @param timeout          a timeout after which to throw a replication exception.
     * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
@@ -73,20 +73,20 @@
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients   a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
-    *                     entire cluster.
-    * @param cacheCommand the cache command to invoke
-    * @param mode         the response mode to use
-    * @param timeout      a timeout after which to throw a replication exception.
+    * @param recipients a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire
+    *                   cluster.
+    * @param rpcCommand the cache command to invoke
+    * @param mode       the response mode to use
+    * @param timeout    a timeout after which to throw a replication exception.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
 
    /**
     * @return true if the current Channel is the coordinator of the cluster.

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -3,7 +3,7 @@
 import org.horizon.annotations.MBean;
 import org.horizon.annotations.ManagedAttribute;
 import org.horizon.annotations.ManagedOperation;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
 import org.horizon.config.GlobalConfiguration;
 import org.horizon.factories.KnownComponentNames;
 import org.horizon.factories.annotations.ComponentName;
@@ -54,16 +54,16 @@
       t.stop();
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
-      return t.invokeRemotely(recipients, cacheCommand, mode, timeout, usePriorityQueue, responseFilter);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
-      return t.invokeRemotely(recipients, cacheCommand, mode, timeout, usePriorityQueue, null);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout) throws Exception {
-      return t.invokeRemotely(recipients, cacheCommand, mode, timeout, false, null);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
    }
 
    public boolean isCoordinator() {
@@ -119,4 +119,9 @@
       double ration = (double) replicationCount.get() / totalCount * 100d;
       return NumberFormat.getInstance().format(ration) + "%";
    }
+
+   // mainly for unit testing
+   public void setTransport(Transport transport) {
+      this.t = transport;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -1,6 +1,6 @@
 package org.horizon.remoting.transport;
 
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
 import org.horizon.config.GlobalConfiguration;
 import org.horizon.factories.annotations.NonVolatile;
 import org.horizon.factories.scopes.Scope;
@@ -44,7 +44,7 @@
     *
     * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
     *                         entire cluster.
-    * @param cacheCommand     the cache command to invoke
+    * @param rpcCommand       the cache command to invoke
     * @param mode             the response mode to use
     * @param timeout          a timeout after which to throw a replication exception.
     * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
@@ -53,7 +53,7 @@
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, ReplicableCommand cacheCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
 
    /**
     * @return true if the current Channel is the coordinator of the cluster.

Added: core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -0,0 +1,68 @@
+package org.horizon;
+
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class BaseReplicatedTest {
+   ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new ThreadLocal<List<CacheManager>>() {
+      @Override
+      protected List<CacheManager> initialValue() {
+         return new LinkedList<CacheManager>();
+      }
+   };
+
+   /**
+    * @return a list of registered cache managers on the current thread.
+    */
+   protected List<CacheManager> getCacheManagers() {
+      return cacheManagerThreadLocal.get();
+   }
+
+   /**
+    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+    * Uses a default clustered cache manager global config.
+    *
+    * @return the new CacheManager
+    */
+   protected CacheManager addCacheManager() {
+      return addCacheManager(GlobalConfiguration.getClusteredDefault());
+   }
+
+   /**
+    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+    *
+    * @param globalConfig config to use
+    * @return the new CacheManager
+    */
+   protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
+      CacheManager cm = new DefaultCacheManager(globalConfig);
+      cacheManagerThreadLocal.get().add(cm);
+      return cm;
+   }
+
+   protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
+      for (CacheManager cm : cacheManagerThreadLocal.get()) {
+         cm.defineCache(cacheName, c);
+      }
+   }
+
+   protected void assertClusterSize(String message, int size) {
+      for (CacheManager cm : cacheManagerThreadLocal.get()) {
+         assert cm.getMembers() != null && cm.getMembers().size() == size : message;
+      }
+   }
+
+   @AfterMethod
+   public void cleanupThreadLocals() {
+      TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new CacheManager[cacheManagerThreadLocal.get().size()]));
+      cacheManagerThreadLocal.get().clear();
+   }
+
+}

Modified: core/branches/flat/src/test/java/org/horizon/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BasicTest.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/BasicTest.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -22,8 +22,10 @@
 package org.horizon;
 
 import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
 import org.horizon.manager.DefaultCacheManager;
 import org.horizon.manager.NamedCacheNotFoundException;
 import org.horizon.util.TestingUtil;
@@ -31,6 +33,8 @@
 
 @Test(groups = "functional")
 public class BasicTest {
+   public static final Log log = LogFactory.getLog(BasicTest.class);
+
    public void basicTest() throws Exception {
       // create a cache manager
       Configuration c = new Configuration(); // LOCAL mode
@@ -61,25 +65,18 @@
       }
    }
 
-   public static final Log log = LogFactory.getLog(BasicTest.class);
-
    public void testBasicReplication() throws NamedCacheNotFoundException {
       Configuration configuration = new Configuration();
       configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
 
-      DefaultCacheManager firstManager = new DefaultCacheManager(configuration);
-      DefaultCacheManager secondManager = new DefaultCacheManager(configuration);
+      CacheManager firstManager = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(), configuration);
+      CacheManager secondManager = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(), configuration);
 
       try {
-         firstManager.start();
-         secondManager.start();
+         CacheSPI firstCache = (CacheSPI) firstManager.getCache();
+         CacheSPI secondCache = (CacheSPI) secondManager.getCache();
+         TestingUtil.blockUntilViewsReceived(60000, firstManager, secondManager);
 
-         CacheSPI firstCache = (CacheSPI) firstManager.getCache("test");
-         CacheSPI secondCache = (CacheSPI) secondManager.getCache("test");
-
-         TestingUtil.blockUntilViewReceived(secondCache, 2, 3000);
-
-
          firstCache.put("key", "value");
 
          assert secondCache.get("key").equals("value");
@@ -90,16 +87,38 @@
          assert secondCache.get("key") == null;
       }
       finally {
-         firstManager.stop();
-         secondManager.stop();
+         TestingUtil.killCacheManagers(firstManager, secondManager);
       }
    }
 
    public void concurrentMapMethodTest() {
+      CacheManager cm = null;
+      Cache<String, String> c = null;
+      try {
+         cm = new DefaultCacheManager();
+         c = cm.getCache();
 
-   }
+         assert c.putIfAbsent("A", "B") == null;
+         assert c.putIfAbsent("A", "C").equals("B");
+         assert c.get("A").equals("B");
 
-   public void transactionalTest() {
+         assert !c.remove("A", "C");
+         assert c.containsKey("A");
+         assert c.remove("A", "B");
+         assert !c.containsKey("A");
 
+         c.put("A", "B");
+
+         assert !c.replace("A", "D", "C");
+         assert c.get("A").equals("B");
+         assert c.replace("A", "B", "C");
+         assert c.get("A").equals("C");
+
+         assert c.replace("A", "X").equals("C");
+         assert c.replace("X", "A") == null;
+         assert !c.containsKey("X");
+      } finally {
+         TestingUtil.killCacheManagers(cm);
+      }
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -5,7 +5,7 @@
 import org.horizon.Cache;
 import org.horizon.CacheSPI;
 import org.horizon.UnitTestCacheFactory;
-import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.RPCCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.config.Configuration;
@@ -118,7 +118,7 @@
 
       // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
       // setting the mock object to expectWithTx the "sync" param to be false.
-      expect(rpcManager.invokeRemotely(anyAddresses(), (ReplicableCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
+      expect(rpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
 
       replay(rpcManager);
 
@@ -162,7 +162,7 @@
          List<Address> memberList = originalRpcManager.getMembers();
          expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
          expect(barfingRpcManager.getAddress()).andReturn(originalRpcManager.getAddress()).anyTimes();
-         expect(barfingRpcManager.invokeRemotely(anyAddresses(), (ReplicableCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
+         expect(barfingRpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
          replay(barfingRpcManager);
 
          TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);

Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -6,79 +6,172 @@
  */
 package org.horizon.replication;
 
+import static org.easymock.EasyMock.*;
+import org.horizon.BaseReplicatedTest;
 import org.horizon.Cache;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.commands.RPCCommand;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
+import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.RPCManagerImpl;
+import org.horizon.remoting.ResponseFilter;
+import org.horizon.remoting.ResponseMode;
+import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNull;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  */
- at Test(groups = {"functional", "jgroups"})
-public class SyncReplTest {
-   private ThreadLocal<Cache<Object, Object>[]> cachesTL = new ThreadLocal<Cache<Object, Object>[]>();
+ at Test(groups = "functional", sequential = true)
+public class SyncReplTest extends BaseReplicatedTest {
+   Cache cache1, cache2;
+   String k = "key", v = "value";
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() {
-      System.out.println("*** In setUp()");
-      Cache<Object, Object>[] caches = new Cache[2];
-      Configuration configuration = new Configuration();
-      configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
-      caches[0] = new UnitTestCacheManager(configuration).createCache("test");
-      caches[1] = new UnitTestCacheManager(configuration).createCache("test");
-      cachesTL.set(caches);
-      TestingUtil.blockUntilViewsReceived(caches, 5000);
-      System.out.println("*** Finished setUp()");
-   }
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      Cache<Object, Object>[] caches = cachesTL.get();
-      if (caches != null) TestingUtil.killCaches(caches);
-      cachesTL.set(null);
+      Configuration replSync = new Configuration();
+      replSync.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+
+      cm1.defineCache("replSync", replSync);
+      cm2.defineCache("replSync", replSync);
+
+      cache1 = cm1.getCache("replSync");
+      cache2 = cm2.getCache("replSync");
+
+      TestingUtil.blockUntilViewsReceived(60000, true, cm1, cm2);
    }
 
    public void testBasicOperation() {
-      Cache<Object, Object>[] caches = cachesTL.get();
       assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
 
-      String k = "key", v = "value";
+      assertNull("Should be null", cache1.get(k));
+      assertNull("Should be null", cache2.get(k));
 
-      assertNull("Should be null", caches[0].get(k));
-      assertNull("Should be null", caches[1].get(k));
+      cache1.put(k, v);
 
-      caches[0].put(k, v);
+      assertEquals(v, cache1.get(k));
+      assertEquals("Should have replicated", v, cache2.get(k));
 
-      assertEquals(v, caches[0].get(k));
-      assertEquals("Should have replicated", v, caches[1].get(k));
+      cache2.remove(k);
+      assert cache1.isEmpty();
+      assert cache2.isEmpty();
    }
 
-   @SuppressWarnings("unchecked")
-   public void testSyncRepl() {
-      Cache<Object, Object>[] caches = cachesTL.get();
+   public void testMultpleCachesOnSharedTransport() {
       assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
+      assert cache1.isEmpty();
+      assert cache2.isEmpty();
 
-      caches[0].getConfiguration().setSyncCommitPhase(true);
-      caches[1].getConfiguration().setSyncCommitPhase(true);
+      List<CacheManager> managers = getCacheManagers();
+      Configuration newConf = new Configuration();
+      newConf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      defineCacheOnAllManagers("newCache", newConf);
+      Cache altCache1 = managers.get(0).getCache("newCache");
+      Cache altCache2 = managers.get(1).getCache("newCache");
 
-      caches[0].put("age", 38);
-      assertEquals("Value should be set", 38, caches[0].get("age"));
-      assertEquals("Value should have replicated", 38, caches[1].get("age"));
+      assert altCache1.isEmpty();
+      assert altCache2.isEmpty();
+
+      cache1.put(k, v);
+      assert cache1.get(k).equals(v);
+      assert cache2.get(k).equals(v);
+      assert altCache1.isEmpty();
+      assert altCache2.isEmpty();
+
+      altCache1.put(k, "value2");
+      assert altCache1.get(k).equals("value2");
+      assert altCache2.get(k).equals("value2");
+      assert cache1.get(k).equals(v);
+      assert cache2.get(k).equals(v);
    }
 
-   private void assertClusterSize(String message, int size) {
-      Cache<Object, Object>[] caches = cachesTL.get();
-      for (Cache c : caches) {
-         assertClusterSize(message, size, c);
-      }
+   public void testReplicateToNonExistentCache() {
+      assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
+      assert cache1.isEmpty();
+      assert cache2.isEmpty();
+
+      List<CacheManager> managers = getCacheManagers();
+      Configuration newConf = new Configuration();
+      newConf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      defineCacheOnAllManagers("newCache", newConf);
+      Cache altCache1 = managers.get(0).getCache("newCache");
+
+      assert altCache1.isEmpty();
+
+      cache1.put(k, v);
+      assert cache1.get(k).equals(v);
+      assert cache2.get(k).equals(v);
+      assert altCache1.isEmpty();
+
+      altCache1.put(k, "value2");
+      assert altCache1.get(k).equals("value2");
+      assert cache1.get(k).equals(v);
+      assert cache2.get(k).equals(v);
+
+      managers.get(0).getCache("newCache").get(k).equals("value2");
    }
 
-   private void assertClusterSize(String message, int size, Cache c) {
-      assertEquals(message, size, c.getCacheManager().getMembers().size());
+   public void testMixingSyncAndAsyncOnSameTransport() throws Exception {
+      List<CacheManager> managers = getCacheManagers();
+      Transport originalTransport = null;
+      RPCManagerImpl rpcManager = null;
+      try {
+         Configuration asyncCache = new Configuration();
+         asyncCache.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+         defineCacheOnAllManagers("asyncCache", asyncCache);
+         Cache asyncCache1 = managers.get(0).getCache("asyncCache");
+
+         // replace the transport with a mock object
+         Transport mockTransport = createMock(Transport.class);
+         Address mockAddressOne = createNiceMock(Address.class);
+         Address mockAddressTwo = createNiceMock(Address.class);
+         List<Address> addresses = new LinkedList<Address>();
+         addresses.add(mockAddressOne);
+         addresses.add(mockAddressTwo);
+         expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
+         expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
+         replay(mockAddressOne, mockAddressTwo);
+
+         // this is shared by all caches managed by the cache manager
+         originalTransport = TestingUtil.extractComponent(asyncCache1, Transport.class);
+         rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(asyncCache1, RPCManager.class);
+         rpcManager.setTransport(mockTransport);
+
+         expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.SYNCHRONOUS),
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+               .andReturn(Collections.emptyList()).once();
+
+         replay(mockTransport);
+         // check that the replication call was sync
+         cache1.put("k", "v");
+
+         // reset to test for async
+         reset(mockTransport);
+         expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
+         expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
+         expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS),
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+               .andReturn(Collections.emptyList()).once();
+
+         replay(mockTransport);
+         asyncCache1.put("k", "v");
+         // check that the replication call was async
+         verify(mockTransport);
+      } finally {
+         // replace original transport
+         if (rpcManager != null) rpcManager.setTransport(originalTransport);
+      }
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java	2009-01-27 23:06:53 UTC (rev 7604)
+++ core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java	2009-01-28 14:34:08 UTC (rev 7605)
@@ -128,9 +128,9 @@
                Class<? extends ReplicableCommand> replicableCommandClass = it.next();
                if (realOne.containsCommandType(replicableCommandClass)) {
                   it.remove();
-               } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+               } else if (realOne.getSingleCommand() instanceof PrepareCommand) //explicit transaction
                {
-                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
+                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleCommand();
                   if (prepareCommand.containsModificationType(replicableCommandClass)) {
                      it.remove();
                   }




More information about the jbosscache-commits mailing list