[jbosscache-commits] JBoss Cache SVN: r7614 - in core/branches/flat/src: main/java/org/horizon/commands/read and 11 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Jan 30 09:10:04 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-30 09:10:03 -0500 (Fri, 30 Jan 2009)
New Revision: 7614

Added:
   core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java
   core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java
   core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java
Removed:
   core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java
   core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
   core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
   core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
   core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
   core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java
   core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java
   core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
   core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
   core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java
   core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
   core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
   core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
   core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
Log:
Fixed replication, invalidation

Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -52,6 +52,8 @@
 
    RemoveCommand buildRemoveCommand(Object key, Object value);
 
+   InvalidateCommand buildInvalidateCommand(Object... keys);
+
    ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue);
 
    SizeCommand buildSizeCommand();
@@ -85,6 +87,4 @@
    ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
 
    ReplicateCommand buildReplicateCommand(ReplicableCommand call);
-
-   InvalidateCommand buildInvalidateCommand(Object fqn);
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -71,6 +71,10 @@
       return new RemoveCommand(key, value, notifier);
    }
 
+   public InvalidateCommand buildInvalidateCommand(Object... keys) {
+      return new InvalidateCommand(notifier, keys);
+   }
+
    public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue) {
       return new ReplaceCommand(key, oldValue, newValue);
    }
@@ -95,8 +99,7 @@
    }
 
    public EvictCommand buildEvictCommand(Object key) {
-      EvictCommand command = new EvictCommand(key);
-      command.initialize(notifier);
+      EvictCommand command = new EvictCommand(key, notifier);
       return command;
    }
 
@@ -137,6 +140,10 @@
             if (rc.getCommands() != null)
                for (ReplicableCommand nested : rc.getCommands()) initializeReplicableCommand(nested);
             break;
+         case InvalidateCommand.METHOD_ID:
+            InvalidateCommand ic = (InvalidateCommand) c;
+            ic.init(notifier);
+            break;
          case PrepareCommand.METHOD_ID:
             PrepareCommand pc = (PrepareCommand) c;
             if (pc.getModifications() != null)
@@ -144,8 +151,4 @@
             break;
       }
    }
-
-   public InvalidateCommand buildInvalidateCommand(Object fqn) {
-      throw new UnsupportedOperationException("Not implemented");//todo please implement!
-   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -7,6 +7,7 @@
 import org.horizon.commands.tx.PrepareCommand;
 import org.horizon.commands.tx.RollbackCommand;
 import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.InvalidateCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
@@ -64,7 +65,9 @@
          case ReplicateCommand.METHOD_ID:
             command = new ReplicateCommand();
             break;
-
+         case InvalidateCommand.METHOD_ID:
+            command = new InvalidateCommand();
+            break;
          default:
             throw new CacheException("Unknown command id " + id + "!");
       }

Modified: core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -69,7 +69,6 @@
       return (key != null ? key.hashCode() : 0);
    }
 
-
    public String toString() {
       return getClass().getSimpleName() + "{" +
             "key=" + key +

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
  */
 package org.horizon.commands.write;
 
-import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.Visitor;
 import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
@@ -30,7 +29,7 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class ClearCommand implements VisitableCommand {
+public class ClearCommand implements WriteCommand {
    private static final Object[] params = new Object[0];
    public static final byte METHOD_ID = 17;
 
@@ -57,4 +56,13 @@
    public void setParameters(int commandId, Object[] parameters) {
       if (commandId != METHOD_ID) throw new IllegalStateException("Invalid method id");
    }
+
+   @Override
+   public String toString() {
+      return "ClearCommand";
+   }
+
+   public boolean isSuccessful() {
+      return true;
+   }
 }

Added: core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -0,0 +1,12 @@
+package org.horizon.commands.write;
+
+import org.horizon.commands.DataCommand;
+
+/**
+ * Mixes features from DataCommand and WriteCommand
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface DataWriteCommand extends WriteCommand, DataCommand {
+}

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -22,8 +22,6 @@
 package org.horizon.commands.write;
 
 import org.horizon.commands.Visitor;
-import org.horizon.commands.read.AbstractDataCommand;
-import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
 import org.horizon.notifications.CacheNotifier;
 
@@ -31,39 +29,36 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class EvictCommand extends AbstractDataCommand {
-   public static final byte METHOD_ID = 120;
+public class EvictCommand extends RemoveCommand {
 
-   private CacheNotifier notifier;
-
-   public EvictCommand(Object key) {
+   public EvictCommand(Object key, CacheNotifier notifier) {
       this.key = key;
+      this.notifier = notifier;
    }
 
    public void initialize(CacheNotifier notifier) {
       this.notifier = notifier;
    }
 
+   @Override
    public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
       return visitor.visitEvictCommand(ctx, this);
    }
 
+   @Override
    public Object perform(InvocationContext ctx) throws Throwable {
-
       if (key == null) throw new NullPointerException("Key is null!!");
-
-      MVCCEntry e = ctx.lookupEntry(key);
-      if (e != null && !e.isNullEntry()) {
-         //todo - add a actual eviction from thr container
-         notifier.notifyCacheEntryEvicted(key, true, ctx);
-         e.setDeleted(true);
-         e.setValid(false);
-         notifier.notifyCacheEntryEvicted(key, false, ctx);
-      }
+      super.perform(ctx);
       return null;
    }
 
+   @Override
+   public void notify(InvocationContext ctx, boolean isPre) {
+      notifier.notifyCacheEntryEvicted(key, isPre, ctx);
+   }
+
+   @Override
    public byte getCommandId() {
-      return METHOD_ID;
+      return -1; // these are not meant for replication!
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -21,17 +21,15 @@
  */
 package org.horizon.commands.write;
 
-import org.horizon.CacheSPI;
 import org.horizon.commands.Visitor;
-import org.horizon.commands.read.AbstractDataCommand;
-import org.horizon.container.DataContainer;
 import org.horizon.context.InvocationContext;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.notifications.CacheNotifier;
-import org.horizon.tree.Fqn;
 
+import java.util.Arrays;
 
+
 /**
  * Removes a node's content from memory - never removes the node. It also clenups data for resident nodes - which are
  * not being touched by eviction.
@@ -39,26 +37,17 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class InvalidateCommand extends AbstractDataCommand {
+public class InvalidateCommand extends RemoveCommand {
    public static final int METHOD_ID = 47;
    private static final Log log = LogFactory.getLog(InvalidateCommand.class);
    private static final boolean trace = log.isTraceEnabled();
+   private Object[] keys;
 
-   /* dependencies*/
-   protected CacheSPI spi;
-   protected CacheNotifier notifier;
-   protected DataContainer dataContainer;
-
-   public InvalidateCommand(Object key) {
-      this.key = key;
-   }
-
    public InvalidateCommand() {
    }
 
-   public void initialize(CacheSPI cacheSpi, DataContainer dataContainer, CacheNotifier notifier) {
-      this.spi = cacheSpi;
-      this.dataContainer = dataContainer;
+   public InvalidateCommand(CacheNotifier notifier, Object... keys) {
+      this.keys = keys;
       this.notifier = notifier;
    }
 
@@ -68,56 +57,20 @@
     * @param ctx invocation context
     * @return null
     */
-   public Object perform(InvocationContext ctx) {
-      Object value = enforceNodeLoading();
-      if (trace) log.trace("Invalidating key:" + key);
-      if (value == null) {
-         return null;
+   public Object perform(InvocationContext ctx) throws Throwable {
+      if (trace) log.trace("Invalidating keys:" + Arrays.toString(keys));
+      for (Object key : keys) {
+         this.key = key;
+         super.perform(ctx);
       }
-      evictNode(key, ctx);
-//      dataContainer.
       return null;
    }
 
-   boolean evictNode(Object key, InvocationContext ctx) {
-      notifier.notifyNodeInvalidated(key, true, ctx);
-      try {
-         return dataContainer.evict(key);
-      }
-      finally {
-         notifier.notifyNodeInvalidated(key, false, ctx);
-      }
+   @Override
+   protected void notify(InvocationContext ctx, boolean isPre) {
+      notifier.notifyCacheEntryInvalidated(key, isPre, ctx);
    }
 
-
-   /**
-    * //TODO: 2.2.0: rather than using CacheSPI this should use peek().  The other interceptors should obtain locks and
-    * load nodes if necessary for this InvalidateCommand. //Even better - this can be handles in the interceptors before
-    * call interceptor
-    */
-   protected Object enforceNodeLoading() {
-      return spi.get(key);
-   }
-
-
-   /**
-    * mark the node to be removed (and all children) as invalid so anyone holding a direct reference to it will be aware
-    * that it is no longer valid.
-    */
-   protected void invalidateNode()//NodeSPI node)
-   {
-      // TODO: Implement me!
-      throw new RuntimeException("Implement me!");
-//      node.setValid(false, true);
-      // root nodes can never be invalid
-//      if (fqn.isRoot()) node.setValid(true, false); // non-recursive.
-   }
-
-
-   public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
-      return visitor.visitInvalidateCommand(ctx, this);
-   }
-
    public byte getCommandId() {
       return METHOD_ID;
    }
@@ -125,21 +78,45 @@
    @Override
    public String toString() {
       return "InvalidateCommand{" +
-            "key=" + key +
+            "keys=" + Arrays.toString(keys) +
             '}';
    }
 
    @Override
    public Object[] getParameters() {
-      return new Object[]{key};
+      if (keys == null || keys.length == 0) {
+         return new Object[]{0};
+      } else if (keys.length == 1) {
+         return new Object[]{1, keys[0]};
+      } else {
+         Object[] retval = new Object[keys.length + 1];
+         retval[0] = keys.length;
+         System.arraycopy(keys, 0, retval, 1, keys.length);
+         return retval;
+      }
    }
 
    @Override
    public void setParameters(int commandId, Object[] args) {
-      key = args[0];
+      int size = (Integer) args[0];
+      keys = new Object[size];
+      if (size == 1) {
+         keys[0] = args[1];
+      } else if (size > 0) {
+         System.arraycopy(args, 1, keys, 0, size);
+      }
    }
 
-   void setFqn(Fqn newFqn) {
-      this.key = newFqn;
+   public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
+      return visitor.visitInvalidateCommand(ctx, this);
    }
+
+   @Override
+   public Object getKey() {
+      throw new UnsupportedOperationException("Not supported.  Use getKeys() instead.");
+   }
+
+   public Object[] getKeys() {
+      return keys;
+   }
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -35,12 +35,13 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class PutKeyValueCommand extends AbstractDataCommand {
+public class PutKeyValueCommand extends AbstractDataCommand implements DataWriteCommand {
    public static final byte METHOD_ID = 3;
 
    protected Object value;
    protected boolean putIfAbsent;
    private CacheNotifier notifier;
+   boolean successful = true;
 
    public PutKeyValueCommand(Object key, Object value, boolean putIfAbsent, CacheNotifier notifier) {
       super(key);
@@ -69,9 +70,10 @@
    }
 
    public Object perform(InvocationContext ctx) throws Throwable {
-      Object o = null;
+      Object o;
       MVCCEntry e = ctx.lookupEntry(key);
       if (e.getValue() != null && putIfAbsent) {
+         successful = false;
          return e.getValue();
       } else {
          notifier.notifyCacheEntryModified(key, true, ctx);
@@ -134,7 +136,6 @@
       return result;
    }
 
-
    public String toString() {
       return "PutKeyValueCommand{" +
             "key= " + key +
@@ -142,4 +143,8 @@
             ", putIfAbsent=" + putIfAbsent +
             '}';
    }
+
+   public boolean isSuccessful() {
+      return successful;
+   }
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
  */
 package org.horizon.commands.write;
 
-import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.Visitor;
 import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
@@ -34,7 +33,7 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class PutMapCommand implements VisitableCommand {
+public class PutMapCommand implements WriteCommand {
    public static final byte METHOD_ID = 121;
 
    private Map<Object, Object> map;
@@ -109,4 +108,8 @@
             "map=" + map +
             '}';
    }
+
+   public boolean isSuccessful() {
+      return true;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -32,9 +32,10 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class RemoveCommand extends AbstractDataCommand {
+public class RemoveCommand extends AbstractDataCommand implements DataWriteCommand {
    public static final byte METHOD_ID = 6;
-   private CacheNotifier notifier;
+   protected CacheNotifier notifier;
+   boolean successful = true;
 
    protected Object value;
 
@@ -57,17 +58,30 @@
 
    public Object perform(InvocationContext ctx) throws Throwable {
       MVCCEntry e = ctx.lookupEntry(key);
-      if (e == null || e.isNullEntry()) return value == null ? null : false;
-      if (value != null && e.getValue() != null && !e.getValue().equals(value))
+      if (e == null || e.isNullEntry()) {
+         if (value == null) {
+            return null;
+         } else {
+            successful = false;
+            return false;
+         }
+      }
+      if (value != null && e.getValue() != null && !e.getValue().equals(value)) {
+         successful = false;
          return false;
+      }
 
-      notifier.notifyCacheEntryRemoved(key, true, ctx);
+      notify(ctx, true);
       e.setDeleted(true);
       e.setValid(false);
-      notifier.notifyCacheEntryRemoved(key, false, ctx);
+      notify(ctx, false);
       return value == null ? e.getValue() : true;
    }
 
+   protected void notify(InvocationContext ctx, boolean isPre) {
+      notifier.notifyCacheEntryRemoved(key, isPre, ctx);
+   }
+
    public byte getCommandId() {
       return METHOD_ID;
    }
@@ -92,9 +106,13 @@
 
 
    public String toString() {
-      return "RemoveCommand{" +
+      return getClass().getSimpleName() + "{" +
             "key=" + key +
             ", value=" + value +
             '}';
    }
+
+   public boolean isSuccessful() {
+      return successful;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -31,11 +31,12 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
-public class ReplaceCommand extends AbstractDataCommand {
+public class ReplaceCommand extends AbstractDataCommand implements DataWriteCommand {
    public static final byte METHOD_ID = 122;
 
    protected Object oldValue;
    protected Object newValue;
+   boolean successful = true;
 
    public ReplaceCommand(Object key, Object oldValue, Object newValue) {
       super(key);
@@ -52,14 +53,34 @@
 
    public Object perform(InvocationContext ctx) throws Throwable {
       MVCCEntry e = ctx.lookupEntry(key);
-      if (e == null || e.isNullEntry()) return oldValue == null ? null : false;
-      if (oldValue == null || oldValue.equals(e.getValue())) {
-         Object old = e.setValue(newValue);
-         return oldValue == null ? old : true;
+      if (e != null) {
+         if (ctx.isOriginLocal()) {
+            if (e.isNullEntry()) return returnValue(null, false);
+
+            if (oldValue == null || oldValue.equals(e.getValue())) {
+               Object old = e.setValue(newValue);
+               return returnValue(old, true);
+            }
+            return returnValue(null, false);
+         } else {
+            // for remotely originating calls, this doesn't check the status of what is under the key at the moment
+            Object old = e.setValue(newValue);
+            return returnValue(old, true);
+         }
       }
-      return oldValue == null ? null : false;
+
+      return returnValue(null, false);
    }
 
+   private Object returnValue(Object beingReplaced, boolean successful) {
+      this.successful = successful;
+      if (oldValue == null) {
+         return beingReplaced;
+      } else {
+         return successful;
+      }
+   }
+
    public byte getCommandId() {
       return METHOD_ID;
    }
@@ -102,4 +123,8 @@
             ", newValue=" + newValue +
             '}';
    }
+
+   public boolean isSuccessful() {
+      return successful;
+   }
 }

Added: core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -0,0 +1,20 @@
+package org.horizon.commands.write;
+
+import org.horizon.commands.VisitableCommand;
+
+/**
+ * A command that modifies the cache in some way
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface WriteCommand extends VisitableCommand {
+   /**
+    * Some commands may want to provide information on whether the command was successful or not.  This is different
+    * from a failure, which usually would result in an exception being thrown.  An example is a putIfAbsent() not doing
+    * anything because the key in question was present.  This would result in a isSuccessful() call returning false.
+    *
+    * @return true if the command completed successfully, false otherwise.
+    */
+   boolean isSuccessful();
+}

Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -47,6 +47,4 @@
    void clear();
 
    Set<K> keySet();
-
-   boolean evict(Object key);
 }

Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -76,10 +76,6 @@
       return new KeySet();
    }
 
-   public boolean evict(Object key) {
-      throw new UnsupportedOperationException("Not implemented");//todo please implement!
-   }
-
    public String toString() {
       return data.toString();
    }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -24,6 +24,7 @@
 import org.horizon.annotations.ManagedAttribute;
 import org.horizon.annotations.ManagedOperation;
 import org.horizon.commands.read.GetKeyValueCommand;
+import org.horizon.commands.write.InvalidateCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.commands.write.ReplaceCommand;
@@ -60,7 +61,6 @@
    protected EntryFactory entryFactory;
 
    protected boolean isActivation = false;
-//   protected boolean usingVersionedInvalidation = false;
 
 
    /**
@@ -74,8 +74,6 @@
                                      DataContainer<Object, Object> dataContainer, EntryFactory entryFactory, CacheNotifier notifier) {
       this.txTable = txTable;
       this.clm = clm;
-//      CacheMode mode = configuration.getCacheMode();
-//      usingVersionedInvalidation = mode.isInvalidation();
       this.dataContainer = dataContainer;
       this.notifier = notifier;
       this.entryFactory = entryFactory;
@@ -104,6 +102,14 @@
    }
 
    @Override
+   public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
+      if (command.getKeys() != null) {
+         for (Object key : command.getKeys()) loadIfNeeded(ctx, key);
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
       if (command.getKey() != null) {
          loadIfNeeded(ctx, command.getKey());

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -29,9 +29,13 @@
 import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.tx.PrepareCommand;
 import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.DataWriteCommand;
 import org.horizon.commands.write.InvalidateCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
+import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.config.Option;
 import org.horizon.context.InvocationContext;
 import org.horizon.context.TransactionContext;
@@ -80,27 +84,39 @@
 
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
-      return handleWriteMethod(ctx, command, command);
+      return handleInvalidate(ctx, command);
    }
 
    @Override
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
+      return handleInvalidate(ctx, command);
+   }
+
+   @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
-      return handleWriteMethod(ctx, command.getKey(), command);
+      return handleInvalidate(ctx, command);
    }
 
    @Override
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
-//      return handleWriteMethod(ctx, command.getKey(), command);
-      //todo handle this - should perfor a remote invalidation aswell!!!
-      return null;
+      // just broadcast the clear command - this is simplest!
+      Object retval = invokeNextInterceptor(ctx, command);
+      if (ctx.isOriginLocal()) replicateCall(ctx, command, defaultSynchronous, ctx.getOptionOverrides());
+      return retval;
    }
 
    @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+      Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
+      return handleInvalidate(ctx, command, keys);
+   }
+
+   @Override
    public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable {
       Object retval = invokeNextInterceptor(ctx, command);
       Transaction tx = ctx.getTransaction();
       if (tx != null) {
-         if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare phase");
+         if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
          // fetch the modifications before the transaction is committed (and thus removed from the txTable)
          GlobalTransaction gtx = ctx.getGlobalTransaction();
          TransactionContext transactionContext = ctx.getTransactionContext();
@@ -123,20 +139,25 @@
       return retval;
    }
 
-   private Object handleWriteMethod(InvocationContext ctx, Object key, VisitableCommand command)
-         throws Throwable {
+   private Object handleInvalidate(InvocationContext ctx, DataWriteCommand command) throws Throwable {
+      return handleInvalidate(ctx, command, command.getKey());
+   }
+
+   private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable {
       Object retval = invokeNextInterceptor(ctx, command);
-      Transaction tx = ctx.getTransaction();
-      Option optionOverride = ctx.getOptionOverrides();
-      if (log.isDebugEnabled()) log.debug("Is a CRUD method");
-      if (key != null) {
-         // could be potentially TRANSACTIONAL.  Ignore if it is, until we see a prepare().
-         if (tx == null || !TransactionTable.isValid(tx)) {
-            // the no-tx case:
-            //replicate an evict call.
-            invalidateAcrossCluster(key, isSynchronous(optionOverride), ctx);
-         } else {
-            if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification(command);
+      if (command.isSuccessful()) {
+         Transaction tx = ctx.getTransaction();
+         Option optionOverride = ctx.getOptionOverrides();
+         if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+         if (keys != null && keys.length != 0) {
+            // could be potentially TRANSACTIONAL.  Ignore if it is, until we see a prepare().
+            if (tx == null || !TransactionTable.isValid(tx)) {
+               // the no-tx case:
+               //replicate an evict call.
+               invalidateAcrossCluster(isSynchronous(optionOverride), ctx, keys);
+            } else {
+               if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification(command);
+            }
          }
       }
       return retval;
@@ -152,7 +173,7 @@
             log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
          } else {
             try {
-               for (Object key : filterVisitor.result) invalidateAcrossCluster(key, defaultSynchronous, ctx);
+               invalidateAcrossCluster(defaultSynchronous, ctx, filterVisitor.result.toArray());
             }
             catch (Throwable t) {
                log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
@@ -193,11 +214,11 @@
    }
 
 
-   protected void invalidateAcrossCluster(Object fqn, boolean synchronous, InvocationContext ctx) throws Throwable {
+   protected void invalidateAcrossCluster(boolean synchronous, InvocationContext ctx, Object[] keys) throws Throwable {
       if (!isLocalModeForced(ctx)) {
          // increment invalidations counter if statistics maintained
          incrementInvalidations();
-         InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+         InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
          if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + command);
          // voila, invalidated!
          replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -28,6 +28,7 @@
 import org.horizon.commands.tx.RollbackCommand;
 import org.horizon.commands.write.ClearCommand;
 import org.horizon.commands.write.EvictCommand;
+import org.horizon.commands.write.InvalidateCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
@@ -143,8 +144,16 @@
 
    @Override
    public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
+      // ensure keys are properly locked for evict commands
+      return visitRemoveCommand(ctx, command);
+   }
+
+   @Override
+   public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
       try {
-         entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
+         if (command.getKeys() != null) {
+            for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx, key, false, true);
+         }
          return invokeNextInterceptor(ctx, command);
       }
       finally {

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
  */
 package org.horizon.interceptors;
 
-import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.tx.CommitCommand;
 import org.horizon.commands.tx.PrepareCommand;
 import org.horizon.commands.tx.RollbackCommand;
@@ -30,6 +29,7 @@
 import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.config.Configuration;
 import org.horizon.context.InvocationContext;
 import org.horizon.context.TransactionContext;
@@ -103,22 +103,25 @@
     * If we are within one transaction we won't do any replication as replication would only be performed at commit
     * time. If the operation didn't originate locally we won't do any replication either.
     */
-   private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command)
+   private Object handleCrudMethod(InvocationContext ctx, WriteCommand command)
          throws Throwable {
       boolean local = isLocalModeForced(ctx);
       if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.
       Object returnValue = invokeNextInterceptor(ctx, command);
-      if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
-         if (trace) {
-            log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
-                  configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
-                  configuration.getSyncReplTimeout());
+
+      if (command.isSuccessful()) {
+         if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
+            if (trace) {
+               log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+                     configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
+                     configuration.getSyncReplTimeout());
+            }
+
+            replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
+         } else {
+            if (local) ctx.getTransactionContext().addLocalModification(command);
          }
-
-         replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
-      } else {
-         if (local) ctx.getTransactionContext().addLocalModification(command);
       }
       return returnValue;
    }

Modified: core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -98,6 +98,4 @@
     * @param transaction the transaction that has just completed
     */
    void notifyTransactionRegistered(Transaction transaction, InvocationContext ctx);
-
-   void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx);
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -531,8 +531,4 @@
       if (list == null) throw new CacheException("Unknown listener annotation: " + annotation);
       return list;
    }
-
-   public void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx) {
-      throw new UnsupportedOperationException("Not implemented");//todo please implement!
-   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -559,29 +559,6 @@
       return (CommandsFactory) extractField(cache, "commandsFactory");
    }
 
-   public static String getJGroupsAttribute(Cache cache, String protocol, String attribute) {
-      throw new RuntimeException("Implement me");
-//      String s = ((JChannel) ((CacheSPI) cache).getRPCManager().getChannel()).getProperties();
-//      String[] protocols = s.split(":");
-//      String attribs = null;
-//      for (String p : protocols) {
-//         boolean hasAttribs = p.contains("(");
-//         String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
-//         attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1) : null;
-//
-//         if (name.equalsIgnoreCase(protocol)) break;
-//      }
-//
-//      if (attribs != null) {
-//         String[] attrArray = attribs.split(";");
-//         for (String a : attrArray) {
-//            String[] kvPairs = a.split("=");
-//            if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
-//         }
-//      }
-//      return null;
-   }
-
    public static void dumpCacheContents(List caches) {
       System.out.println("**** START: Cache Contents ****");
       int count = 1;
@@ -591,7 +568,6 @@
             System.out.println("  ** Cache " + count + " is null!");
          } else {
             System.out.println("  ** Cache " + count + " is " + c.getCacheManager().getAddress());
-//            System.out.println("    " + CachePrinter.printCacheDetails(c));
          }
          count++;
       }
@@ -616,6 +592,6 @@
    }
 
    public static TransactionManager getTransactionManager(Cache cache) {
-      return extractComponent(cache, TransactionManager.class);
+      return cache == null ? null : extractComponent(cache, TransactionManager.class);
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -125,11 +125,11 @@
          this.expectedCommands.addAll(Arrays.asList(expectedCommands));
       }
 
-      public void waitForReplication() {
-         waitForReplication(120, TimeUnit.SECONDS);
+      public void waitForRPC() {
+         waitForRPC(120, TimeUnit.SECONDS);
       }
 
-      public void waitForReplication(long time, TimeUnit unit) {
+      public void waitForRPC(long time, TimeUnit unit) {
          assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
          try {
             if (!latch.await(time, unit)) {

Modified: core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -78,8 +78,8 @@
       invalAsyncCache1.put("k", "invalAsync");
       localCache1.put("k", "local");
 
-      r1.waitForReplication();
-      r2.waitForReplication();
+      r1.waitForRPC();
+      r2.waitForRPC();
 
       assert replSyncCache1.get("k").equals("replSync");
       assert replSyncCache2.get("k").equals("replSync");
@@ -92,6 +92,4 @@
       assert localCache1.get("k").equals("local");
       assert localCache2.get("k") == null;
    }
-
-
 }

Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -57,7 +57,7 @@
    public void testNoOpWhenKeyPresent() {
       replListener2.expect(PutKeyValueCommand.class);
       cache1.putForExternalRead(key, value);
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
 
       assertEquals("PFER should have succeeded", value, cache1.get(key));
@@ -66,14 +66,14 @@
       // reset
       replListener2.expect(RemoveCommand.class);
       cache1.remove(key);
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       assert cache1.isEmpty() : "Should have reset";
       assert cache2.isEmpty() : "Should have reset";
 
       replListener2.expect(PutKeyValueCommand.class);
       cache1.put(key, value);
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       // now this pfer should be a no-op
       cache1.putForExternalRead(key, value2);
@@ -127,7 +127,7 @@
       // create parent node first
       replListener2.expect(PutKeyValueCommand.class);
       cache1.put(key + "0", value);
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       // start a tx and do some stuff.
       replListener2.expect(PutKeyValueCommand.class);
@@ -136,7 +136,7 @@
       cache1.putForExternalRead(key, value); // should have happened in a separate tx and have committed already.
       Transaction t = tm1.suspend();
 
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
       assertEquals("PFER should have completed", value, cache1.get(key));
       assertEquals("PFER should have completed", value, cache2.get(key));
 
@@ -201,7 +201,7 @@
 
       replListener2.expect(PutKeyValueCommand.class);
       cache1.putForExternalRead(key, value);
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       assertEquals("PFER updated cache1", value, cache1.get(key));
       assertEquals("PFER propagated to cache2 as expected", value, cache2.get(key));
@@ -244,7 +244,7 @@
       tm1.begin();
       cache1.putForExternalRead(key, value);
       tm1.commit();
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       TransactionTable tt1 = getTransactionTable(cache1);
       TransactionTable tt2 = getTransactionTable(cache2);
@@ -261,7 +261,7 @@
       cache1.putForExternalRead(key, value);
       cache1.put(key, value);
       tm1.commit();
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -273,7 +273,7 @@
       cache1.put(key, value);
       cache1.putForExternalRead(key, value);
       tm1.commit();
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -287,7 +287,7 @@
       cache1.putForExternalRead(key, value);
       cache1.put(key, value);
       tm1.commit();
-      replListener2.waitForReplication();
+      replListener2.waitForRPC();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";

Modified: core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -212,6 +212,6 @@
    }
 
    public void testEviction() throws Exception {
-      assert false : "Implement me once the eviction config beans have been fixed!";
+      // TODO: implement me
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -4,6 +4,8 @@
 import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
 import org.horizon.commands.RPCCommand;
+import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.InvalidateCommand;
 import org.horizon.config.Configuration;
 import org.horizon.remoting.RPCManager;
 import org.horizon.remoting.RPCManagerImpl;
@@ -14,8 +16,7 @@
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.RollbackException;
@@ -29,7 +30,7 @@
    protected Cache cache1, cache2;
    protected boolean isSync;
 
-   @BeforeTest
+   @BeforeMethod
    public void setUp() {
       Configuration c = new Configuration();
       c.setStateRetrievalTimeout(1000);
@@ -42,26 +43,6 @@
       TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
    }
 
-   @AfterMethod
-   public void cleanUp() {
-      for (Cache c : new Cache[]{cache1, cache2}) {
-         TransactionManager tm = TestingUtil.getTransactionManager(c);
-         try {
-            if (tm != null && tm.getTransaction() != null) {
-               tm.rollback();
-            }
-         } catch (Exception e) {
-            try {
-               if (tm != null) tm.suspend();
-            } catch (Exception e2) {
-               // ignore
-            }
-         }
-         c.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
-         c.clear();
-      }
-   }
-
    public void testRemove() throws Exception {
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       cache1.put("key", "value");
@@ -73,58 +54,58 @@
       ReplListener rl = attachReplicationListener(cache2);
       rl.expectAny();
       assertEquals("value", cache1.remove("key"));
-      rl.waitForReplication();
+      rl.waitForRPC();
 
       assertEquals(false, cache2.containsKey("key"));
    }
 
-   public void nodeResurrectionTest() throws Exception {
-      ReplListener rl = attachReplicationListener(cache2);
-      rl.expectAny();
+   public void testResurrectEntry() throws Exception {
+      ReplListener r2 = attachReplicationListener(cache2);
+      r2.expect(InvalidateCommand.class);
       cache1.put("key", "value");
-      rl.waitForReplication();
+      r2.waitForRPC();
 
       assertEquals("value", cache1.get("key"));
       assertEquals(null, cache2.get("key"));
-      rl.expectAny();
+      r2.expect(InvalidateCommand.class);
       cache1.put("key", "newValue");
-      rl.waitForReplication();
+      r2.waitForRPC();
 
       assertEquals("newValue", cache1.get("key"));
       assertEquals(null, cache2.get("key"));
 
-      rl.expectAny();
+      r2.expect(InvalidateCommand.class);
       assertEquals("newValue", cache1.remove("key"));
-      rl.waitForReplication();
+      r2.waitForRPC();
 
       assertEquals(null, cache1.get("key"));
       assertEquals(null, cache2.get("key"));
 
       // Restore locally
-      rl.expectAny();
+      r2.expect(InvalidateCommand.class);
       cache1.put("key", "value");
-      rl.waitForReplication();
+      r2.waitForRPC();
 
       assertEquals("value", cache1.get("key"));
       assertEquals(null, cache2.get("key"));
 
-      ReplListener rl2 = attachReplicationListener(cache1);
-      rl2.expectAny();
+      ReplListener r1 = attachReplicationListener(cache1);
+      r1.expect(InvalidateCommand.class);
       cache2.put("key", "value2");
-      rl2.waitForReplication();
+      r1.waitForRPC();
 
+      assertEquals("value2", cache2.get("key"));
       assertEquals(null, cache1.get("key"));
-      assertEquals("value2", cache2.get("key"));
    }
 
-   public void deleteNonExistentTest() throws Exception {
+   public void testDeleteNonExistentEntry() throws Exception {
       assertNull("Should be null", cache1.get("key"));
       assertNull("Should be null", cache2.get("key"));
 
       ReplListener rl2 = attachReplicationListener(cache2);
-      rl2.expectAny();
+      rl2.expect(InvalidateCommand.class);
       cache1.put("key", "value");
-      rl2.waitForReplication();
+      rl2.waitForRPC();
 
       assertEquals("value", cache1.get("key"));
       assertNull("Should be null", cache2.get("key"));
@@ -132,12 +113,12 @@
       // OK, here's the real test
       TransactionManager tm = TestingUtil.getTransactionManager(cache2);
       ReplListener rl1 = attachReplicationListener(cache1);
-      rl1.expectAnyWithTx();
+      rl1.expect(InvalidateCommand.class); // invalidates always happen outside of a tx
       tm.begin();
       // Remove an entry that doesn't exist in cache2
       cache2.remove("key");
       tm.commit();
-      rl1.waitForReplication();
+      rl1.waitForRPC();
 
       assert cache1.get("key") == null;
       assert cache2.get("key") == null;
@@ -189,6 +170,7 @@
       Transport origTransport = TestingUtil.extractComponent(cache1, Transport.class);
       try {
          Transport mockTransport = createMock(Transport.class);
+         rpcManager.setTransport(mockTransport);
          Address addressOne = createNiceMock(Address.class);
          Address addressTwo = createNiceMock(Address.class);
          List<Address> members = new ArrayList<Address>(2);
@@ -196,6 +178,7 @@
          members.add(addressTwo);
 
          expect(mockTransport.getMembers()).andReturn(members).anyTimes();
+         expect(mockTransport.getAddress()).andReturn(addressOne).anyTimes();
          expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
                                              eq(isSync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS),
                                              anyLong(), anyBoolean(), (ResponseFilter) anyObject())).andReturn(null).anyTimes();
@@ -208,4 +191,120 @@
          if (rpcManager != null) rpcManager.setTransport(origTransport);
       }
    }
+
+   public void testPutIfAbsent() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value");
+      assert cache2.get("key").equals("value");
+      assert cache1.get("key") == null;
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(InvalidateCommand.class);
+      cache1.putIfAbsent("key", "value");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key") == null;
+
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value2");
+
+      cache1.putIfAbsent("key", "value3");
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value2"); // should not invalidate cache2!!
+   }
+
+   public void testRemoveIfPresent() {
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "value1");
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value2");
+
+      cache1.remove("key", "value");
+
+      assert cache1.get("key").equals("value1") : "Should not remove";
+      assert cache2.get("key").equals("value2") : "Should not evict";
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(InvalidateCommand.class);
+      cache1.remove("key", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+   }
+
+   public void testClear() {
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "value1");
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value2");
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(ClearCommand.class);
+      cache1.clear();
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+   }
+
+   public void testReplace() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.replace("key", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "valueN");
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(InvalidateCommand.class);
+      cache1.replace("key", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key") == null;
+   }
+
+   public void testReplaceWithOldVal() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.replace("key", "valueOld", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "valueN");
+
+      cache1.replace("key", "valueOld", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key").equals("valueN");
+      assert cache2.get("key").equals("value2");
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(InvalidateCommand.class);
+      cache1.replace("key", "valueN", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key") == null;
+   }
 }

Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -51,7 +51,7 @@
       replListener2.expectAny();
       cache1.put(key, "value1");
       // allow for replication
-      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+      replListener2.waitForRPC(60, TimeUnit.SECONDS);
       assertEquals("value1", cache1.get(key));
       assertEquals("value1", cache2.get(key));
 
@@ -59,7 +59,7 @@
       cache1.put(key, "value2");
       assertEquals("value2", cache1.get(key));
 
-      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+      replListener2.waitForRPC(60, TimeUnit.SECONDS);
 
       assertEquals("value2", cache1.get(key));
       assertEquals("value2", cache2.get(key));
@@ -73,7 +73,7 @@
       replListener2.expectAny();
       cache1.put(key, "value1");
       // allow for replication
-      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+      replListener2.waitForRPC(60, TimeUnit.SECONDS);
       assertEquals("value1", cache1.get(key));
       assertEquals("value1", cache2.get(key));
 
@@ -87,7 +87,7 @@
 
       mgr.commit();
 
-      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+      replListener2.waitForRPC(60, TimeUnit.SECONDS);
 
       assertEquals("value2", cache1.get(key));
       assertEquals("value2", cache2.get(key));

Added: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -0,0 +1,10 @@
+package org.horizon.replication;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", sequential = true)
+public class AsyncReplicatedAPITest extends BaseReplicatedAPITest {
+   public AsyncReplicatedAPITest() {
+      isSync = false;
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -0,0 +1,212 @@
+package org.horizon.replication;
+
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.PutKeyValueCommand;
+import org.horizon.commands.write.PutMapCommand;
+import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.config.Configuration;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+ at Test(groups = "functional", sequential = true)
+public abstract class BaseReplicatedAPITest extends BaseClusteredTest {
+
+   Cache cache1, cache2;
+   protected boolean isSync;
+
+   @BeforeMethod
+   public void setUp() {
+      Configuration c = new Configuration();
+      c.setStateRetrievalTimeout(1000);
+      c.setFetchInMemoryState(false);
+      c.setCacheMode(isSync ? Configuration.CacheMode.REPL_SYNC : Configuration.CacheMode.REPL_ASYNC);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      List<Cache> caches = createClusteredCaches(2, "replication", c);
+      cache1 = caches.get(0);
+      cache2 = caches.get(1);
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+   }
+
+   public void put() {
+      // test a simple put!
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(PutKeyValueCommand.class);
+      cache1.put("key", "value");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value");
+
+      Map map = new HashMap();
+      map.put("key2", "value2");
+      map.put("key3", "value3");
+
+      r.expect(PutMapCommand.class);
+      cache1.putAll(map);
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value");
+      assert cache1.get("key2").equals("value2");
+      assert cache2.get("key2").equals("value2");
+      assert cache1.get("key3").equals("value3");
+      assert cache2.get("key3").equals("value3");
+   }
+
+   public void remove() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value");
+      assert cache2.get("key").equals("value");
+      assert cache1.get("key") == null;
+
+      ReplListener r = attachReplicationListener(cache2);
+      r.expect(RemoveCommand.class);
+      cache1.remove("key");
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "value");
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value");
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value");
+
+      r.expect(RemoveCommand.class);
+      cache1.remove("key");
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+   }
+
+   public void testPutIfAbsent() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "valueOld");
+      assert cache2.get("key").equals("valueOld");
+      assert cache1.get("key") == null;
+
+      BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+      r.expect(PutKeyValueCommand.class);
+      cache1.putIfAbsent("key", "value");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value");
+
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value2");
+
+      cache1.putIfAbsent("key", "value3");
+
+      assert cache1.get("key").equals("value");
+      assert cache2.get("key").equals("value2"); // should not invalidate cache2!!
+   }
+
+   public void testRemoveIfPresent() {
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "value1");
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value2");
+
+      cache1.remove("key", "value");
+
+      assert cache1.get("key").equals("value1") : "Should not remove";
+      assert cache2.get("key").equals("value2") : "Should not remove";
+
+      BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+      r.expect(RemoveCommand.class);
+      cache1.remove("key", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+   }
+
+   public void testClear() {
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "value1");
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value2");
+
+      BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+      r.expect(ClearCommand.class);
+      cache1.clear();
+      r.waitForRPC();
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key") == null;
+   }
+
+   public void testReplace() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.replace("key", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "valueN");
+
+      BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+      r.expect(ReplaceCommand.class);
+      cache1.replace("key", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value1");
+   }
+
+   public void testReplaceWithOldVal() {
+      cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache2.put("key", "value2");
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.replace("key", "valueOld", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key") == null;
+      assert cache2.get("key").equals("value2");
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.put("key", "valueN");
+
+      cache1.replace("key", "valueOld", "value1"); // should do nothing since there is nothing to replace on cache1
+
+      assert cache1.get("key").equals("valueN");
+      assert cache2.get("key").equals("value2");
+
+      BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+      r.expect(ReplaceCommand.class);
+      cache1.replace("key", "valueN", "value1");
+      r.waitForRPC();
+
+      assert cache1.get("key").equals("value1");
+      assert cache2.get("key").equals("value1");
+   }
+}

Deleted: core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -1,71 +0,0 @@
-package org.horizon.replication;
-
-import org.horizon.BaseClusteredTest;
-import org.horizon.Cache;
-import org.horizon.commands.VisitableCommand;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
-import org.horizon.lock.TimeoutException;
-import org.horizon.transaction.DummyTransactionManagerLookup;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.transaction.TransactionManager;
-import java.util.List;
-
-/**
- * Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl Timeouts
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
- */
- at Test(groups = "functional", sequential = true)
-public class ExceptionTest extends BaseClusteredTest {
-   private Cache cache1;
-   private Cache cache2;
-
-   @BeforeMethod
-   public void setUp() {
-      Configuration c = new Configuration();
-      c.setSyncCommitPhase(true);
-      c.setSyncRollbackPhase(true);
-      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-
-      List<Cache> caches = createClusteredCaches(2, "ExceptionTestCache", c);
-      cache1 = caches.get(0);
-      cache2 = caches.get(1);
-   }
-
-   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
-   public void testSyncReplTimeout() {
-      cache2.addInterceptor(new CommandInterceptor() {
-         @Override
-         protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
-            // Add a delay
-            Thread.sleep(100);
-            return super.handleDefault(ctx, cmd);
-         }
-      }, 0);
-
-      cache1.getConfiguration().setSyncReplTimeout(1);
-      cache2.getConfiguration().setSyncReplTimeout(1);
-      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
-
-      cache1.put("k", "v");
-   }
-
-   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
-   public void testLockAcquisitionTimeout() throws Exception {
-      cache2.getConfiguration().setLockAcquisitionTimeout(1);
-      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
-
-      // get a lock on cache 2 and hold on to it.
-      TransactionManager tm = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
-      tm.begin();
-      cache2.put("block", "block");
-      tm.suspend();
-      cache1.put("block", "v");
-   }
-}

Modified: core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java	2009-01-29 18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -9,8 +9,12 @@
 
 import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
+import org.horizon.commands.VisitableCommand;
 import org.horizon.config.Configuration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
 import org.horizon.lock.IsolationLevel;
+import org.horizon.lock.TimeoutException;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.assertNotNull;
@@ -28,7 +32,7 @@
 
 @Test(groups = "functional", sequential = true)
 public class ReplicationExceptionTest extends BaseClusteredTest {
-   private Cache<String, ContainerData> cache1, cache2;
+   private Cache cache1, cache2;
 
    @BeforeMethod
    public void setUp() {
@@ -89,6 +93,37 @@
       }
    }
 
+   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
+   public void testSyncReplTimeout() {
+      cache2.addInterceptor(new CommandInterceptor() {
+         @Override
+         protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
+            // Add a delay
+            Thread.sleep(100);
+            return super.handleDefault(ctx, cmd);
+         }
+      }, 0);
+
+      cache1.getConfiguration().setSyncReplTimeout(1);
+      cache2.getConfiguration().setSyncReplTimeout(1);
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+      cache1.put("k", "v");
+   }
+
+   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
+   public void testLockAcquisitionTimeout() throws Exception {
+      cache2.getConfiguration().setLockAcquisitionTimeout(1);
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+      // get a lock on cache 2 and hold on to it.
+      TransactionManager tm = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+      tm.begin();
+      cache2.put("block", "block");
+      tm.suspend();
+      cache1.put("block", "v");
+   }
+
    static class NonSerializabeData {
       int i;
    }

Added: core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java	2009-01-30 14:10:03 UTC (rev 7614)
@@ -0,0 +1,11 @@
+package org.horizon.replication;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", sequential = true)
+public class SyncReplicatedAPITest extends BaseReplicatedAPITest {
+   public SyncReplicatedAPITest() {
+      isSync = true;
+   }
+}
+




More information about the jbosscache-commits mailing list