[infinispan-commits] Infinispan SVN: r169 - in trunk/core/src: main/java/org/infinispan/commands/write and 10 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Apr 24 14:15:22 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-04-24 14:15:21 -0400 (Fri, 24 Apr 2009)
New Revision: 169

Added:
   trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
   trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
   trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
   trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java
   trunk/core/src/main/java/org/infinispan/config/Configuration.java
   trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java
   trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
   trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
   trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java
   trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java
   trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java
Log:
[ISPN-30] (DIST) Initial implementation of DIST

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -58,6 +58,8 @@
 
    InvalidateCommand buildInvalidateCommand(Object... keys);
 
+   InvalidateCommand buildInvalidateFromL1Command(Object... keys);
+
    ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespanMillis, long maxIdleTimeMillis);
 
    SizeCommand buildSizeCommand();

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -34,11 +34,13 @@
 import org.infinispan.commands.write.ClearCommand;
 import org.infinispan.commands.write.EvictCommand;
 import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.PutMapCommand;
 import org.infinispan.commands.write.RemoveCommand;
 import org.infinispan.commands.write.ReplaceCommand;
 import org.infinispan.container.DataContainer;
+import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.interceptors.InterceptorChain;
 import org.infinispan.loader.CacheLoaderManager;
@@ -62,14 +64,18 @@
    // some stateless commands can be reused so that they aren't constructed again all the time.
    SizeCommand cachedSizeCommand;
    private InterceptorChain interceptorChain;
+   private DistributionManager distributionManager;
 
    @Inject
-   public void setupDependencies(DataContainer container, CacheNotifier notifier, Cache cache, InterceptorChain interceptorChain, CacheLoaderManager clManager) {
+   public void setupDependencies(DataContainer container, CacheNotifier notifier, Cache cache,
+                                 InterceptorChain interceptorChain, CacheLoaderManager clManager,
+                                 DistributionManager distributionManager) {
       this.dataContainer = container;
       this.notifier = notifier;
       this.cache = cache;
       this.interceptorChain = interceptorChain;
       this.cacheLoaderManager = clManager;
+      this.distributionManager = distributionManager;
    }
 
    public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value, long lifespanMillis, long maxIdleTimeMillis) {
@@ -84,6 +90,10 @@
       return new InvalidateCommand(notifier, keys);
    }
 
+   public InvalidateCommand buildInvalidateFromL1Command(Object... keys) {
+      return new InvalidateL1Command(distributionManager, notifier, keys);
+   }
+
    public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespan, long maxIdleTimeMillis) {
       return new ReplaceCommand(key, oldValue, newValue, lifespan, maxIdleTimeMillis);
    }
@@ -171,6 +181,10 @@
             InvalidateCommand ic = (InvalidateCommand) c;
             ic.init(notifier);
             break;
+         case InvalidateL1Command.COMMAND_ID:
+            InvalidateL1Command ilc = (InvalidateL1Command) c;
+            ilc.init(distributionManager, notifier);
+            break;
          case PrepareCommand.COMMAND_ID:
             PrepareCommand pc = (PrepareCommand) c;
             if (pc.getModifications() != null)

Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -11,6 +11,7 @@
 import org.infinispan.commands.tx.RollbackCommand;
 import org.infinispan.commands.write.ClearCommand;
 import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.PutMapCommand;
 import org.infinispan.commands.write.RemoveCommand;
@@ -29,7 +30,7 @@
 @Scope(Scopes.GLOBAL)
 public class RemoteCommandFactory {
    RpcManager rpcManager;
-   
+
    @Inject
    public void init(RpcManager rpcManager) {
       this.rpcManager = rpcManager;
@@ -85,6 +86,9 @@
          case InvalidateCommand.COMMAND_ID:
             command = new InvalidateCommand();
             break;
+         case InvalidateL1Command.COMMAND_ID:
+            command = new InvalidateL1Command();
+            break;
          case StateTransferControlCommand.METHOD_ID:
             command = new StateTransferControlCommand();
             ((StateTransferControlCommand) command).init(rpcManager);

Modified: trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -22,8 +22,8 @@
 package org.infinispan.commands.write;
 
 import org.infinispan.commands.Visitor;
+import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.MVCCEntry;
-import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.context.InvocationContext;
 
 /**
@@ -69,4 +69,8 @@
    public boolean isSuccessful() {
       return true;
    }
+
+   public boolean isConditional() {
+      return false;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -56,20 +56,24 @@
     * @param ctx invocation context
     * @return null
     */
+   @Override
    public Object perform(InvocationContext ctx) throws Throwable {
-      if (trace) log.trace("Invalidating keys:" + Arrays.toString(keys));
-      for (Object key : keys) {
-         this.key = key;
-         super.perform(ctx);
-      }
+      if (trace) log.trace("Invalidating keys {0}", Arrays.toString(keys));
+      for (Object k : keys) invalidate(ctx, k);
       return null;
    }
 
+   protected void invalidate(InvocationContext ctx, Object keyToInvalidate) throws Throwable {
+      this.key = keyToInvalidate; // so that the superclass can see it
+      super.perform(ctx);
+   }
+
    @Override
    protected void notify(InvocationContext ctx, Object value, boolean isPre) {
       notifier.notifyCacheEntryInvalidated(key, isPre, ctx);
    }
 
+   @Override
    public byte getCommandId() {
       return COMMAND_ID;
    }
@@ -106,6 +110,7 @@
       }
    }
 
+   @Override
    public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
       return visitor.visitInvalidateCommand(ctx, this);
    }

Added: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,42 @@
+package org.infinispan.commands.write;
+
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+
+/**
+ * Invalidates an entry in a L1 cache (used with DIST mode)
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class InvalidateL1Command extends InvalidateCommand {
+   public static final int COMMAND_ID = 48;
+   private DistributionManager dm;
+
+   public InvalidateL1Command() {
+   }
+
+   public InvalidateL1Command(DistributionManager dm, CacheNotifier notifier, Object... keys) {
+      super(notifier, keys);
+      this.dm = dm;
+   }
+
+   @Override
+   public byte getCommandId() {
+      return COMMAND_ID;
+   }
+
+   public void init(DistributionManager dm, CacheNotifier n) {
+      super.init(n);
+      this.dm = dm;
+   }
+
+   @Override
+   public Object perform(InvocationContext ctx) throws Throwable {
+      for (Object k : getKeys()) {
+         if (!dm.isLocal(k)) invalidate(ctx, k);
+      }
+      return null;
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -172,4 +172,8 @@
    public boolean isSuccessful() {
       return successful;
    }
+
+   public boolean isConditional() {
+      return putIfAbsent;
+   }
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -131,6 +131,10 @@
       return true;
    }
 
+   public boolean isConditional() {
+      return false;
+   }
+
    public long getLifespanMillis() {
       return lifespanMillis;
    }

Modified: trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -42,10 +42,11 @@
    protected CacheNotifier notifier;
    boolean successful = true;
 
-   /** When not null, value indicates that the entry should only be removed if 
-    * the key is mapped to this value. By the time the RemoveCommand needs to 
-    * be marshalled, the condition must have been true locally already, so 
-    * there's no need to marshall the value. **/
+   /**
+    * When not null, value indicates that the entry should only be removed if the key is mapped to this value. By the
+    * time the RemoveCommand needs to be marshalled, the condition must have been true locally already, so there's no
+    * need to marshall the value. *
+    */
    protected transient Object value;
 
    public RemoveCommand(Object key, Object value, CacheNotifier notifier) {
@@ -125,4 +126,8 @@
    public boolean isSuccessful() {
       return successful;
    }
+
+   public boolean isConditional() {
+      return value != null;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -40,7 +40,7 @@
    long maxIdleTimeMillis = -1;
    boolean successful = true;
 
-   public ReplaceCommand() {      
+   public ReplaceCommand() {
    }
 
    public ReplaceCommand(Object key, Object oldValue, Object newValue, long lifespanMillis, long maxIdleTimeMillis) {
@@ -136,6 +136,10 @@
       return successful;
    }
 
+   public boolean isConditional() {
+      return true;
+   }
+
    public long getLifespanMillis() {
       return lifespanMillis;
    }

Modified: trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -17,4 +17,13 @@
     * @return true if the command completed successfully, false otherwise.
     */
    boolean isSuccessful();
+
+   /**
+    * Certain commands only work based on a certain condition or state of the cache.  For example, {@link
+    * org.infinispan.Cache#putIfAbsent(Object, Object)} only does anything if a condition is met, i.e., the entry in
+    * question is not already present.  This method tests whether the command in question is conditional or not.
+    *
+    * @return true if the command is conditional, false otherwise
+    */
+   boolean isConditional();
 }

Modified: trunk/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/Configuration.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/config/Configuration.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -77,6 +77,7 @@
    }
 
    public void setUnsafeUnreliableReturnValues(boolean unsafeUnreliableReturnValues) {
+      testImmutability("unsafeUnreliableReturnValues");
       this.unsafeUnreliableReturnValues = unsafeUnreliableReturnValues;
    }
 

Modified: trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -305,7 +305,6 @@
             ", transactionContext=" + transactionContext +
             ", flags=" + flags +
             ", contextFlags=" + contextFlags +
-//            ", invocationLocks=" + locks +
             ", lookedUpEntries size=" + (lookedUpEntries == null ? 0 : lookedUpEntries.size()) +
             '}';
    }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,5 +1,7 @@
 package org.infinispan.distribution;
 
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
 import org.infinispan.remoting.transport.Address;
@@ -40,5 +42,20 @@
     * @return a list of addresses where the key may reside
     */
    Map<Object, List<Address>> locateAll(Collection<Object> keys);
+
+   /**
+    * Transforms a cache entry so it is marked for L1 rather than the primary cache data structure.
+    *
+    * @param entry entry to transform
+    */
+   void transformForL1(CacheEntry entry);
+
+   /**
+    * Retrieves a cache entry from a remote source
+    *
+    * @param key key to look up
+    * @return an internal cache entry, or null if it cannot be located
+    */
+   InternalCacheEntry retrieveFromRemoteSource(Object key) throws Exception;
 }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,6 +1,10 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.remote.ClusteredGetCommand;
 import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
@@ -10,7 +14,12 @@
 import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.ResponseFilter;
+import org.infinispan.remoting.ResponseMode;
 import org.infinispan.remoting.RpcManager;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Util;
 
@@ -19,7 +28,7 @@
 import java.util.Map;
 
 /**
- * // TODO: Manik: Document this
+ * The default distribution manager implementation
  *
  * @author Manik Surtani
  * @since 4.0
@@ -33,12 +42,14 @@
    CacheManagerNotifier notifier;
    int replCount;
    ViewChangeListener listener;
+   CommandsFactory cf;
 
    @Inject
-   public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier) {
+   public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf) {
       this.configuration = configuration;
       this.rpcManager = rpcManager;
       this.notifier = notifier;
+      this.cf = cf;
    }
 
    // needs to be AFTER the RpcManager
@@ -66,16 +77,36 @@
    }
 
    public List<Address> locate(Object key) {
-      List<Address> adds = consistentHash.locate(key, replCount);
-      if (trace) log.trace("Located {0} addresses for key {1}.  Repl count is {2}, addresses are {3}", adds.size(),
-                           key, replCount, adds);
-      return adds;
+      return consistentHash.locate(key, replCount);
    }
 
    public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
       return consistentHash.locateAll(keys, replCount);
    }
 
+   public void transformForL1(CacheEntry entry) {
+      if (entry.getLifespan() < 0 || entry.getLifespan() > configuration.getL1Lifespan())
+         entry.setLifespan(configuration.getL1Lifespan());
+   }
+
+   public InternalCacheEntry retrieveFromRemoteSource(Object key) throws Exception {
+      ClusteredGetCommand get = cf.buildClusteredGetCommand(key);
+
+      ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
+      List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
+                                                           configuration.getSyncReplTimeout(), false, filter, false);
+
+      if (!responses.isEmpty()) {
+         for (Response r : responses) {
+            if (r instanceof SuccessfulResponse) {
+               return (InternalCacheEntry) ((SuccessfulResponse) r).getResponseValue();
+            }
+         }
+      }
+
+      return null;
+   }
+
    @Listener
    public class ViewChangeListener {
       @ViewChanged

Modified: trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -155,6 +155,7 @@
       s.add(TransportFactory.class);
       s.add(MarshallerFactory.class);
       s.add(ResponseGeneratorFactory.class);
+      s.add(DistributionManagerFactory.class);
       return s;
    }
 

Added: trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,16 @@
+package org.infinispan.factories;
+
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.distribution.DistributionManagerImpl;
+import org.infinispan.factories.annotations.DefaultFactoryFor;
+
+ at DefaultFactoryFor(classes = DistributionManager.class)
+public class DistributionManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
+   @SuppressWarnings("unchecked")
+   public <T> T construct(Class<T> componentType) {
+      if (configuration.getCacheMode().isDistributed())
+         return (T) new DistributionManagerImpl();
+      else
+         return null;
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -25,7 +25,6 @@
 import org.infinispan.batch.BatchContainer;
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.config.ConfigurationException;
-import org.infinispan.distribution.DistributionManager;
 import org.infinispan.eviction.EvictionManager;
 import org.infinispan.factories.annotations.DefaultFactoryFor;
 import org.infinispan.factories.context.ContextFactory;
@@ -46,7 +45,7 @@
 @DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
                               CacheLoaderManager.class, InvocationContextContainer.class,
                               TransactionTable.class, BatchContainer.class, ContextFactory.class,
-                              TransactionLog.class, EvictionManager.class, DistributionManager.class})
+                              TransactionLog.class, EvictionManager.class})
 public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    @Override
    public <T> T construct(Class<T> componentType) {

Modified: trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -77,7 +77,10 @@
          interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
 
       // load the tx interceptor
-      interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
+      if (configuration.getCacheMode().isDistributed())
+         interceptorChain.appendIntereceptor(createInterceptor(DistTxInterceptor.class));
+      else
+         interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
 
       if (configuration.isUseLazyDeserialization())
          interceptorChain.appendIntereceptor(createInterceptor(MarshalledValueInterceptor.class));
@@ -110,8 +113,12 @@
             interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
          }
       }
-      interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
 
+      if (configuration.getCacheMode().isDistributed())
+         interceptorChain.appendIntereceptor(createInterceptor(DistLockingInterceptor.class));
+      else
+         interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
+
       CommandInterceptor callInterceptor = createInterceptor(CallInterceptor.class);
       interceptorChain.appendIntereceptor(callInterceptor);
       if (log.isTraceEnabled()) log.trace("Finished building default interceptor chain.");

Added: trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,28 @@
+package org.infinispan.interceptors;
+
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+
+/**
+ * A subclass of the locking interceptor that is able to differentiate committing changes on a ReadCommittedEntry for
+ * storage in the main cache or in L1, used by DIST
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistLockingInterceptor extends LockingInterceptor {
+   DistributionManager dm;
+
+   @Inject
+   public void injectDistributionManager(DistributionManager dm) {
+      this.dm = dm;
+   }
+
+   @Override
+   protected void commitEntry(InvocationContext ctx, CacheEntry entry) {
+      if (!dm.isLocal(entry.getKey())) dm.transformForL1(entry);
+      entry.commit(dataContainer);
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,84 @@
+package org.infinispan.interceptors;
+
+import org.infinispan.commands.AbstractVisitor;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.DataWriteCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.PutMapCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A special form of the TxInterceptor that is aware of distribution and consistent hashing, and as such only replays
+ * methods during a remote prepare that are targeted to this specific cache instance.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistTxInterceptor extends TxInterceptor {
+   DistributionManager dm;
+   ReplayCommandVisitor replayCommandVisitor = new ReplayCommandVisitor();
+
+   @Inject
+   public void injectDistributionManager(DistributionManager dm) {
+      this.dm = dm;
+   }
+
+   /**
+    * Only replays modifications that are
+    */
+   @Override
+   protected VisitableCommand getCommandToReplay(VisitableCommand command) {
+      try {
+         return (VisitableCommand) command.acceptVisitor(null, replayCommandVisitor);
+      } catch (RuntimeException re) {
+         throw re;
+      } catch (Throwable th) {
+         throw new RuntimeException(th);
+      }
+   }
+
+   class ReplayCommandVisitor extends AbstractVisitor {
+      @Override
+      public Object visitPutMapCommand(InvocationContext ignored, PutMapCommand command) {
+         Map newMap = new HashMap();
+         for (Map.Entry entry : command.getMap().entrySet()) {
+            if (dm.isLocal(entry.getKey())) newMap.put(entry.getKey(), entry.getValue());
+         }
+
+         if (newMap.isEmpty()) return null;
+         if (newMap.size() == command.getMap().size()) return command;
+         return commandsFactory.buildPutMapCommand(newMap, command.getLifespanMillis(), command.getMaxIdleTimeMillis());
+      }
+
+      @Override
+      public Object visitPutKeyValueCommand(InvocationContext ignored, PutKeyValueCommand command) {
+         return visitDataWriteCommand(command);
+      }
+
+      @Override
+      public Object visitRemoveCommand(InvocationContext ignored, RemoveCommand command) {
+         return visitDataWriteCommand(command);
+      }
+
+      @Override
+      public Object visitReplaceCommand(InvocationContext ignored, ReplaceCommand command) {
+         return visitDataWriteCommand(command);
+      }
+
+      private VisitableCommand visitDataWriteCommand(DataWriteCommand command) {
+         return dm.isLocal(command.getKey()) ? command : null;
+      }
+
+      @Override
+      public Object handleDefault(InvocationContext ignored, VisitableCommand command) {
+         return command;
+      }
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -3,7 +3,6 @@
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.DataCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
-import org.infinispan.commands.remote.ClusteredGetCommand;
 import org.infinispan.commands.tx.CommitCommand;
 import org.infinispan.commands.tx.PrepareCommand;
 import org.infinispan.commands.tx.RollbackCommand;
@@ -20,8 +19,6 @@
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.interceptors.base.BaseRpcInterceptor;
-import org.infinispan.remoting.ResponseMode;
-import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.transaction.GlobalTransaction;
 import org.infinispan.util.Immutables;
@@ -34,7 +31,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * // TODO: Document this
+ * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
  *
  * @author manik
  * @since 4.0
@@ -45,13 +42,14 @@
    // TODO move this to the transaction context.  Will scale better there.
    private final Map<GlobalTransaction, List<Address>> txRecipients = new ConcurrentHashMap<GlobalTransaction, List<Address>>();
    static final RecipientGenerator CLEAR_COMMAND_GENERATOR = new RecipientGenerator() {
+      private final Object[] EMPTY_ARRAY = {};
 
       public List<Address> generateRecipients() {
          return null;
       }
 
       public Object[] getKeys() {
-         return null;
+         return EMPTY_ARRAY;
       }
    };
 
@@ -70,31 +68,36 @@
    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
       Object returnValue = invokeNextInterceptor(ctx, command);
-      if (returnValue == null) {
+      // need to check in the context as well since a null retval is not necessarily an indication of the entry not being
+      // available.  It could just have been removed in the same tx beforehand.
+      if (returnValue == null && ctx.lookupEntry(command.getKey()) == null)
+         returnValue = remoteGetAndStoreInL1(ctx, command.getKey());
+      return returnValue;
+   }
+
+   private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
+      if (ctx.isOriginLocal() && !dm.isLocal(key)) {
+         if (trace) log.trace("Doing a remote get for key {0}", key);
          // attempt a remote lookup
-         // TODO update ClusteredGetCommand (maybe a new command?) to ensure we get back ICEs.
-         ClusteredGetCommand get = cf.buildClusteredGetCommand(command.getKey());
-         // TODO use a RspFilter to filter responses         
-         List<Response> responses = rpcManager.invokeRemotely(dm.locate(command.getKey()), get, ResponseMode.SYNCHRONOUS,
-                                                              configuration.getSyncReplTimeout(), false, false);
+         InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
 
-         // the first response is all that matters
-         if (responses.isEmpty()) return returnValue;
-
-         for (Object response : responses) {
-            if (!(response instanceof Throwable)) {
-               InternalCacheEntry ice = (InternalCacheEntry) response;
-               if (configuration.isL1CacheEnabled()) {
-                  long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
-                  PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
-                  invokeNextInterceptor(ctx, put);
-               }
-               return ice.getValue();
+         if (ice != null) {
+            if (configuration.isL1CacheEnabled()) {
+               if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+               long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+               PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+               invokeNextInterceptor(ctx, put);
+            } else {
+               if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
             }
+            return ice.getValue();
          }
-         return null;
+
+      } else {
+         if (trace)
+            log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
       }
-      return returnValue;
+      return null;
    }
 
    // ---- WRITE commands
@@ -113,6 +116,7 @@
 
    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+
       return handleWriteCommand(ctx, command,
                                 new SingleKeyRecipientGenerator(command.getKey()));
    }
@@ -163,7 +167,7 @@
          txRecipients.put(command.getGlobalTransaction(), recipients);
 
          // this method will return immediately if we're the only member (because exclude_self=true)
-         replicateCall(ctx, command, sync);
+         replicateCall(ctx, recipients, command, sync, false);
       }
 
       return retVal;
@@ -202,12 +206,21 @@
    }
 
 
+   private void remoteGetBeforeWrite(InvocationContext ctx, Object... keys) throws Throwable {
+      // only do this if we are sync (OR if we dont care about return values!)
+//      if (!configuration.isUnsafeUnreliableReturnValues()) {
+      for (Object k : keys) remoteGetAndStoreInL1(ctx, k);
+//      }
+   }
+
    /**
     * If we are within one transaction we won't do any replication as replication would only be performed at commit
     * time. If the operation didn't originate locally we won't do any replication either.
     */
    private Object handleWriteCommand(InvocationContext ctx, WriteCommand command, RecipientGenerator recipientGenerator) throws Throwable {
       boolean local = isLocalModeForced(ctx);
+      // see if we need to load values from remote srcs first
+      remoteGetBeforeWrite(ctx, recipientGenerator.getKeys());
       if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.
 
@@ -215,16 +228,11 @@
 
       if (command.isSuccessful()) {
          if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
-            if (trace) {
-               log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
-                     configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
-                     configuration.getSyncReplTimeout());
-            }
-
             List<Address> rec = recipientGenerator.generateRecipients();
+            if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
             // if L1 caching is used make sure we broadcast an invalidate message
             if (configuration.isL1CacheEnabled() && rec != null) {
-               InvalidateCommand ic = cf.buildInvalidateCommand(recipientGenerator.getKeys());
+               InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
                replicateCall(ctx, ic, isSynchronous(ctx), false);
             }
             replicateCall(ctx, rec, command, isSynchronous(ctx), false);

Modified: trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -222,6 +222,7 @@
             if (trace) log.trace("Nothing to do since there are no modifications in scope.");
          }
       } else {
+         if (trace) log.trace("Transactional.  Not cleaning up locks till the transaction ends.");
          if (useReadCommitted) {
             Map<Object, CacheEntry> lookedUpEntries = ctx.getLookedUpEntries();
             if (!lookedUpEntries.isEmpty()) {
@@ -258,8 +259,9 @@
             Object key = e.getKey();
             boolean needToUnlock = lockManager.possiblyLocked(entry);
             // could be null with read-committed
-            if (entry != null && entry.isChanged()) entry.commit(dataContainer);
-            else {
+            if (entry != null && entry.isChanged()) {
+               commitEntry(ctx, entry);
+            } else {
                if (trace) log.trace("Entry for key {0} is null, not calling commitUpdate", key);
             }
 
@@ -291,6 +293,10 @@
       ctx.setContainsLocks(false);
    }
 
+   protected void commitEntry(InvocationContext ctx, CacheEntry entry) {
+      entry.commit(dataContainer);
+   }
+
    @SuppressWarnings("unchecked")
    private void transactionalCleanup(boolean commit, InvocationContext ctx) {
       if (ctx.getTransactionContext() != null) {

Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -34,8 +34,8 @@
 import org.infinispan.factories.ComponentRegistry;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.context.ContextFactory;
+import org.infinispan.invocation.Flag;
 import org.infinispan.invocation.InvocationContextContainer;
-import org.infinispan.invocation.Flag;
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
 import org.infinispan.lock.LockManager;
@@ -405,14 +405,29 @@
    }
 
    /**
+    * Designed to be overridden.  Returns a VisitableCommand fit for replaying locally, based on the modification passed
+    * in.  If a null value is returned, this means that the command should not be replayed.
+    *
+    * @param modification modification in a prepare call
+    * @return a VisitableCommand representing this modification, fit for replaying, or null if the command should not be
+    *         replayed.
+    */
+   protected VisitableCommand getCommandToReplay(VisitableCommand modification) {
+      return modification;
+   }
+
+   /**
     * Replays modifications
     */
-   protected void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable {
+   private void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable {
       try {
          // replay modifications
          for (VisitableCommand modification : command.getModifications()) {
-            invokeNextInterceptor(ctx, modification);
-            assertTxIsStillValid(ltx);
+            VisitableCommand toReplay = getCommandToReplay(modification);
+            if (toReplay != null) {
+               invokeNextInterceptor(ctx, toReplay);
+               assertTxIsStillValid(ltx);
+            }
          }
       }
       catch (Throwable th) {

Modified: trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -18,8 +18,10 @@
 import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.transport.Address;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -77,7 +79,9 @@
    }
 
    private List<Response> doRemoteCall(ClusteredGetCommand clusteredGetCommand) throws CacheLoaderException {
-      ResponseFilter filter = new ClusteredGetResponseValidityFilter(rpcManager.getTransport().getMembers(), rpcManager.getLocalAddress());
+      Set<Address> validMembers = new HashSet<Address>(rpcManager.getTransport().getMembers());
+      validMembers.remove(rpcManager.getLocalAddress());
+      ResponseFilter filter = new ClusteredGetResponseValidityFilter(validMembers);
       try {
          return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter, false);
       } catch (Exception e) {

Modified: trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -3,8 +3,8 @@
 import org.infinispan.remoting.ResponseFilter;
 import org.infinispan.remoting.transport.Address;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
 
 /**
  * A filter that tests the validity of {@link org.infinispan.commands.remote.ClusteredGetCommand}s.
@@ -16,12 +16,10 @@
 
    private int numValidResponses = 0;
 
-   private List<Address> pendingResponders;
+   private Collection<Address> pendingResponders;
 
-   public ClusteredGetResponseValidityFilter(List<Address> expected, Address localAddress) {
-      this.pendingResponders = new ArrayList<Address>(expected);
-      // We'll never get a response from ourself
-      this.pendingResponders.remove(localAddress);
+   public ClusteredGetResponseValidityFilter(Collection<Address> pendingResponders) {
+      this.pendingResponders = new HashSet<Address>(pendingResponders);
    }
 
    public boolean isAcceptable(Response response, Address address) {

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -304,14 +304,17 @@
          if (rsps == null) return Collections.emptyList();
          List<Response> retval = new ArrayList<Response>(rsps.size());
 
+         boolean noValidResponses = true;
          for (Rsp rsp : rsps.values()) {
             if (rsp.wasSuspected() || !rsp.wasReceived()) {
                if (rsp.wasSuspected()) {
                   throw new SuspectException("Suspected member: " + rsp.getSender());
                } else {
-                  throw new TimeoutException("Replication timeout for " + rsp.getSender());
+                  // if we have a response filter then we may not have waited for some nodes!
+                  if (responseFilter == null) throw new TimeoutException("Replication timeout for " + rsp.getSender());
                }
             } else {
+               noValidResponses = false;
                if (rsp.getValue() != null) {
                   Response value = (Response) rsp.getValue();
                   if (value instanceof ExceptionResponse) {
@@ -326,6 +329,8 @@
                }
             }
          }
+
+         if (noValidResponses) throw new TimeoutException("Timed out waiting for valid responses!");
          return retval;
       } finally {
          // release the "processing" lock so that other threads are aware of the network call having completed

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,6 +1,8 @@
 package org.infinispan.distribution;
 
 import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.config.Configuration;
 import static org.infinispan.config.Configuration.CacheMode.DIST_ASYNC;
 import static org.infinispan.config.Configuration.CacheMode.DIST_SYNC;
@@ -16,7 +18,6 @@
 
 import javax.transaction.TransactionManager;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
 @Test(groups = "functional", testName = "distribution.BaseDistFunctionalTest")
@@ -28,12 +29,13 @@
 
    protected void createCacheManagers() throws Throwable {
       Configuration c = getDefaultClusteredConfig(sync ? DIST_SYNC : DIST_ASYNC);
+      if (!sync) c.setUnsafeUnreliableReturnValues(true);
       if (tx) c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
       caches = createClusteredCaches(4, "dist", c);
       c1 = caches.get(0);
-      c2 = caches.get(0);
-      c3 = caches.get(0);
-      c4 = caches.get(0);
+      c2 = caches.get(1);
+      c3 = caches.get(2);
+      c4 = caches.get(3);
    }
 
    // ----------------- HELPERS ----------------
@@ -42,30 +44,41 @@
       for (Cache<Object, String> c : caches) assert c.isEmpty();
 
       c1.put("k1", "value");
-      asyncWait();
-      for (Cache<Object, String> c : caches) assert c.get("k1").equals("value");
+      asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
+      for (Cache<Object, String> c : caches)
+         assert "value".equals(c.get("k1")) : "Failed on cache " + c.getCacheManager().getAddress();
       assertOwnershipAndNonOwnership("k1");
    }
 
    protected Cache<Object, String> getFirstNonOwner(String key) {
-      return getNonOwners(key).get(0);
+      return getNonOwners(key)[0];
    }
 
+   protected Cache<Object, String> getSecondNonOwner(String key) {
+      return getNonOwners(key)[1];
+   }
+
    protected void assertOnAllCachesAndOwnership(Object key, String value) {
       for (Cache<Object, String> c : caches) {
-         if (value == null)
-            assert c.get(key) == null;
-         else
-            assert value.equals(c.get(key));
+         Object realVal = c.get(key);
+         if (value == null) {
+            assert realVal == null : "Expecting [" + key + "] to equal [" + value + "] on cache ["
+                  + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+         } else {
+            assert value.equals(realVal) : "Expecting [" + key + "] to equal [" + value + "] on cache ["
+                  + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+         }
       }
-      assertOwnershipAndNonOwnership("k1");
+      if (value != null) assertOwnershipAndNonOwnership(key);
    }
 
    protected void assertOwnershipAndNonOwnership(Object key) {
       for (Cache<Object, String> c : caches) {
          DataContainer dc = c.getAdvancedCache().getDataContainer();
          if (isOwner(c, key)) {
-            assert dc.get(key) instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+            InternalCacheEntry ice = dc.get(key);
+            assert ice != null : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned null!";
+            assert ice instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
          } else {
             if (dc.containsKey(key)) {
                assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
@@ -104,18 +117,26 @@
       return false;
    }
 
-   protected List<Cache<Object, String>> getOwners(Object key) {
-      List<Cache<Object, String>> owners = new ArrayList<Cache<Object, String>>();
+   protected static boolean isFirstOwner(Cache<?, ?> c, Object key) {
+      DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
+      List<Address> ownerAddresses = dm.locate(key);
+      return c.getCacheManager().getAddress().equals(ownerAddresses.get(0));
+   }
+
+   protected Cache<Object, String>[] getOwners(Object key) {
+      Cache<Object, String>[] owners = new Cache[2];
+      int i = 0;
       for (Cache<Object, String> c : caches) {
-         if (isOwner(c, key)) owners.add(c);
+         if (isOwner(c, key)) owners[i++] = c;
       }
       return owners;
    }
 
-   protected List<Cache<Object, String>> getNonOwners(Object key) {
-      List<Cache<Object, String>> nonOwners = new ArrayList<Cache<Object, String>>();
+   protected Cache<Object, String>[] getNonOwners(Object key) {
+      Cache<Object, String>[] nonOwners = new Cache[2];
+      int i = 0;
       for (Cache<Object, String> c : caches) {
-         if (!isOwner(c, key)) nonOwners.add(c);
+         if (!isOwner(c, key)) nonOwners[i++] = c;
       }
       return nonOwners;
    }
@@ -125,7 +146,19 @@
       return dm.locate(key);
    }
 
-   protected void asyncWait() {
+   protected DistributionManager getDistributionManager(Cache<?, ?> c) {
+      return c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
+   }
+
+   /**
+    * Blocks and waits for a replication event on async caches
+    *
+    * @param key     key that causes the replication.  Used to determine which caches to listen on.  If null, all caches
+    *                are checked
+    * @param command command to listen for
+    * @param caches  on which this key should be invalidated
+    */
+   protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... caches) {
       // no op.
    }
 
@@ -144,7 +177,7 @@
       public MagicKey(Cache<?, ?> toMapTo) {
          address = toMapTo.getCacheManager().getAddress().toString();
          // generate a hashcode that will always map it to the specified cache.
-         for (int i = 0; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
+         for (int i = 1; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
             // create a dummy object with this hashcode
             final int hc = i;
             Object dummy = new Object() {
@@ -154,7 +187,7 @@
                }
             };
 
-            if (BaseDistFunctionalTest.isOwner(toMapTo, dummy)) {
+            if (BaseDistFunctionalTest.isFirstOwner(toMapTo, dummy)) {
                // we have found a hashcode that works!
                hashcode = hc;
                break;
@@ -168,6 +201,19 @@
       }
 
       @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         MagicKey magicKey = (MagicKey) o;
+
+         if (hashcode != magicKey.hashcode) return false;
+         if (address != null ? !address.equals(magicKey.address) : magicKey.address != null) return false;
+
+         return true;
+      }
+
+      @Override
       public String toString() {
          return "MagicKey{" +
                "hashcode=" + hashcode +

Modified: trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -11,7 +11,7 @@
 import java.util.List;
 import java.util.Map;
 
- at Test(groups = "unit", testName = "distribution.DefaultConsistentHashTest", enabled = false)
+ at Test(groups = "unit", testName = "distribution.DefaultConsistentHashTest")
 public class DefaultConsistentHashTest {
 
    List<Address> servers;

Modified: trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,24 +1,57 @@
 package org.infinispan.distribution;
 
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
+import org.infinispan.test.ReplListener;
 import org.testng.annotations.Test;
 
- at Test(groups = "functional", testName = "distribution.DistAsyncFuncTest", enabled = false)
+import java.util.HashMap;
+import java.util.Map;
+
+ at Test(groups = "functional", testName = "distribution.DistAsyncFuncTest")
 public class DistAsyncFuncTest extends DistSyncFuncTest {
 
+   ReplListener r1, r2, r3, r4;
+   ReplListener[] r;
+   Map<Cache<?, ?>, ReplListener> listenerLookup;
+
    public DistAsyncFuncTest() {
       sync = false;
       tx = false;
    }
 
    @Override
-   protected void asyncWait() {
-      // we need to wait for an async event to happen on *each* cache?
-      // TODO figure this out properly!
-      try {
-         Thread.sleep(2000);
-      } catch (InterruptedException e) {
-         e.printStackTrace();  // TODO: Customise this generated block
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      r1 = new ReplListener(c1, true, true);
+      r2 = new ReplListener(c2, true, true);
+      r3 = new ReplListener(c3, true, true);
+      r4 = new ReplListener(c4, true, true);
+      r = new ReplListener[]{r1, r2, r3, r4};
+      listenerLookup = new HashMap<Cache<?, ?>, ReplListener>();
+      for (ReplListener rl : r) listenerLookup.put(rl.getCache(), rl);
+   }
+
+
+   @Override
+   protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... cachesOnWhichKeyShouldInval) {
+      if (key == null) {
+         // test all caches.
+         for (ReplListener rl : r) rl.expect(command);
+         for (ReplListener rl : r) rl.waitForRpc();
+      } else {
+         for (Cache<?, ?> c : getOwners(key)) {
+            listenerLookup.get(c).expect(command);
+            listenerLookup.get(c).waitForRpc();
+         }
+
+         if (cachesOnWhichKeyShouldInval != null) {
+            for (Cache<?, ?> c : cachesOnWhichKeyShouldInval) {
+               listenerLookup.get(c).expect(InvalidateL1Command.class);
+               listenerLookup.get(c).waitForRpc();
+            }
+         }
       }
    }
-
 }
\ No newline at end of file

Modified: trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,10 +1,24 @@
 package org.infinispan.distribution;
 
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.test.ReplListener;
 import org.testng.annotations.Test;
 
- at Test(groups = "functional", testName = "distribution.DistAsyncTxFuncTest", enabled = false)
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+ at Test(groups = "functional", testName = "distribution.DistAsyncTxFuncTest")
 public class DistAsyncTxFuncTest extends DistSyncTxFuncTest {
 
+   ReplListener r1, r2, r3, r4;
+   ReplListener[] r;
+   Map<Cache<?, ?>, ReplListener> listenerLookup;
+
    public DistAsyncTxFuncTest() {
       sync = false;
       tx = true;
@@ -12,13 +26,51 @@
    }
 
    @Override
-   protected void asyncWait() {
-      // we need to wait for an async event to happen on *each* cache?
-      // TODO figure this out properly!
-      try {
-         Thread.sleep(2000);
-      } catch (InterruptedException e) {
-         e.printStackTrace();  // TODO: Customise this generated block
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      r1 = new ReplListener(c1, true, true);
+      r2 = new ReplListener(c2, true, true);
+      r3 = new ReplListener(c3, true, true);
+      r4 = new ReplListener(c4, true, true);
+      r = new ReplListener[]{r1, r2, r3, r4};
+      listenerLookup = new HashMap<Cache<?, ?>, ReplListener>();
+      for (ReplListener rl : r) listenerLookup.put(rl.getCache(), rl);
+   }
+
+   @Override
+   protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... cachesOnWhichKeyShouldInval) {
+      if (key == null) {
+         // test all caches.
+         for (ReplListener rl : r) rl.expect(command);
+         for (ReplListener rl : r) rl.waitForRpc();
+      } else {
+         for (Cache<?, ?> c : getOwners(key)) {
+            listenerLookup.get(c).expect(command);
+            listenerLookup.get(c).waitForRpc();
+         }
+
+         if (cachesOnWhichKeyShouldInval != null) {
+            for (Cache<?, ?> c : cachesOnWhichKeyShouldInval) {
+               listenerLookup.get(c).expect(InvalidateCommand.class);
+               listenerLookup.get(c).waitForRpc();
+            }
+         }
       }
    }
+
+   @Override
+   protected void asyncTxWait(Object... keys) {
+      // Wait for a tx completion event
+      if (keys != null) {
+         Set<Cache<?, ?>> cachesInTx = new HashSet<Cache<?, ?>>();
+         for (Object k : keys) {
+            cachesInTx.addAll(Arrays.asList(getOwners(k)));
+         }
+
+         for (Cache<?, ?> c : cachesInTx) {
+            listenerLookup.get(c).expectAnyWithTx();
+            listenerLookup.get(c).waitForRpc();
+         }
+      }
+   }
 }
\ No newline at end of file

Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,9 +1,13 @@
 package org.infinispan.distribution;
 
 import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
 import org.testng.annotations.Test;
 
- at Test(groups = "functional", testName = "distribution.DistSyncFuncTest", enabled = false)
+ at Test(groups = "functional", testName = "distribution.DistSyncFuncTest")
 public class DistSyncFuncTest extends BaseDistFunctionalTest {
 
    public DistSyncFuncTest() {
@@ -14,9 +18,9 @@
    public void testBasicDistribution() {
       for (Cache<Object, String> c : caches) assert c.isEmpty();
 
-      c1.put("k1", "value");
+      getOwners("k1")[0].put("k1", "value");
 
-      asyncWait();
+      asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
 
       for (Cache<Object, String> c : caches) {
          if (isOwner(c, "k1")) {
@@ -42,26 +46,30 @@
 
    public void testPutFromNonOwner() {
       initAndTest();
-      Object retval = getFirstNonOwner("k1").put("k1", "value2");
-      asyncWait();
-      if (sync) assert "value".equals(retval);
+      Cache<Object, String> nonOwner = getFirstNonOwner("k1");
+      System.out.println("Non-owner address is " + nonOwner.getCacheManager().getAddress());
+
+      Object retval = nonOwner.put("k1", "value2");
+      asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));
+
+      assert "value".equals(retval);
       assertOnAllCachesAndOwnership("k1", "value2");
    }
 
    public void testPutIfAbsentFromNonOwner() {
       initAndTest();
       Object retval = getFirstNonOwner("k1").putIfAbsent("k1", "value2");
-      asyncWait();
-      if (sync) assert "value".equals(retval);
 
+      assert "value".equals(retval);
+
       assertOnAllCachesAndOwnership("k1", "value");
 
       c1.clear();
-      asyncWait();
+      asyncWait(null, ClearCommand.class);
 
       retval = getFirstNonOwner("k1").putIfAbsent("k1", "value2");
-      asyncWait();
-      if (sync) assert null == retval;
+      asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));
+      assert null == retval;
 
       assertOnAllCachesAndOwnership("k1", "value2");
    }
@@ -69,7 +77,7 @@
    public void testRemoveFromNonOwner() {
       initAndTest();
       Object retval = getFirstNonOwner("k1").remove("k1");
-      asyncWait();
+      asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
       if (sync) assert "value".equals(retval);
 
       assertOnAllCachesAndOwnership("k1", null);
@@ -78,14 +86,13 @@
    public void testConditionalRemoveFromNonOwner() {
       initAndTest();
       boolean retval = getFirstNonOwner("k1").remove("k1", "value2");
-      asyncWait();
-      if (sync) assert !retval : "Should not have removed entry";
+      assert !retval : "Should not have removed entry";
 
       assertOnAllCachesAndOwnership("k1", "value");
 
       retval = getFirstNonOwner("k1").remove("k1", "value");
-      asyncWait();
-      if (sync) assert retval : "Should have removed entry";
+      asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+      assert retval : "Should have removed entry";
 
       assertOnAllCachesAndOwnership("k1", null);
    }
@@ -93,36 +100,34 @@
    public void testReplaceFromNonOwner() {
       initAndTest();
       Object retval = getFirstNonOwner("k1").replace("k1", "value2");
-      asyncWait();
-      if (sync) assert "value".equals(retval);
+      assert "value".equals(retval);
 
+      asyncWait("k1", ReplaceCommand.class, getSecondNonOwner("k1"));
+
       assertOnAllCachesAndOwnership("k1", "value2");
 
       c1.clear();
-      asyncWait();
+      asyncWait(null, ClearCommand.class);
 
       retval = getFirstNonOwner("k1").replace("k1", "value2");
-      asyncWait();
-      if (sync) assert retval == null;
+      assert retval == null;
 
       assertOnAllCachesAndOwnership("k1", null);
    }
 
    public void testConditionalReplaceFromNonOwner() {
       initAndTest();
-      boolean retval = getFirstNonOwner("k1").replace("k1", "valueX", "value2");
-      asyncWait();
-      if (sync) assert !retval : "Should not have replaced";
+      Cache<Object, String> nonOwner = getFirstNonOwner("k1");
+      boolean retval = nonOwner.replace("k1", "valueX", "value2");
+      assert !retval : "Should not have replaced";
 
       assertOnAllCachesAndOwnership("k1", "value");
 
-      c1.clear();
-      asyncWait();
+      assert !nonOwner.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class).isLocal("k1");
+      retval = nonOwner.replace("k1", "value", "value2");
+      asyncWait("k1", ReplaceCommand.class, getSecondNonOwner("k1"));
+      assert retval : "Should have replaced";
 
-      retval = getFirstNonOwner("k1").replace("k1", "value", "value2");
-      asyncWait();
-      if (sync) assert retval : "Should have replaced";
-
       assertOnAllCachesAndOwnership("k1", "value2");
    }
 
@@ -130,8 +135,8 @@
       for (Cache<Object, String> c : caches) assert c.isEmpty();
 
       for (int i = 0; i < 10; i++) {
-         c1.put("k" + i, "value" + i);
-         asyncWait();
+         getOwners("k" + i)[0].put("k" + i, "value" + i);
+         asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
       }
 
       // this will fill up L1 as well
@@ -140,7 +145,7 @@
       for (Cache<Object, String> c : caches) assert !c.isEmpty();
 
       c1.clear();
-      asyncWait();
+      asyncWait(null, ClearCommand.class);
 
       for (Cache<Object, String> c : caches) assert c.isEmpty();
    }

Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -1,10 +1,11 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.write.PutKeyValueCommand;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
 
- at Test(groups = "functional", testName = "distribution.DistSyncTxFuncTest", enabled = false)
+ at Test(groups = "functional", testName = "distribution.DistSyncTxFuncTest")
 public class DistSyncTxFuncTest extends BaseDistFunctionalTest {
    public DistSyncTxFuncTest() {
       sync = true;
@@ -12,10 +13,17 @@
       cleanup = CleanupPhase.AFTER_METHOD; // ensure any stale TXs are wiped
    }
 
+   protected void asyncTxWait(Object... keys) {
+      // no op.  Meant to be overridden
+   }
+
    private void init(MagicKey k1, MagicKey k2) {
       // neither key maps on to c4
       c2.put(k1, "value1");
+      asyncWait(k1, PutKeyValueCommand.class);
+
       c2.put(k2, "value2");
+      asyncWait(k2, PutKeyValueCommand.class);
 
       assertIsInContainerImmortal(c1, k1);
       assertIsInContainerImmortal(c2, k1);
@@ -42,7 +50,7 @@
       c4.put(k2, "new_value2");
       tm4.commit();
 
-      asyncWait();
+      asyncTxWait(k1, k2);
 
       assertIsInContainerImmortal(c1, k1);
       assertIsInContainerImmortal(c2, k1);
@@ -54,9 +62,13 @@
       assertIsNotInL1(c1, k2);
       assertIsNotInL1(c3, k1);
 
-      assertOnAllCachesAndOwnership(k1, "new_value1");
-      assertOnAllCachesAndOwnership(k2, "new_value2");
+      checkOwnership(k1, k2, "new_value1", "new_value2");
+   }
 
+   private void checkOwnership(MagicKey k1, MagicKey k2, String v1, String v2) {
+      assertOnAllCachesAndOwnership(k1, v1);
+      assertOnAllCachesAndOwnership(k2, v2);
+
       assertIsInL1(c4, k1);
       assertIsInL1(c4, k2);
       assertIsInL1(c1, k2);
@@ -77,8 +89,34 @@
       c4.put(k2, "new_value2");
       tm4.rollback();
 
-      asyncWait();
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
 
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
+   }
+
+   public void testPutFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      Object ret = c4.put(k1, "new_value");
+      assert "value1".equals(ret);
+      ret = c4.put(k2, "new_value");
+      assert "value2".equals(ret);
+      tm4.rollback();
+
       assertIsInContainerImmortal(c1, k1);
       assertIsInContainerImmortal(c2, k1);
       assertIsInContainerImmortal(c2, k2);
@@ -89,16 +127,181 @@
       assertIsNotInL1(c1, k2);
       assertIsNotInL1(c3, k1);
 
-      assertOnAllCachesAndOwnership(k1, "value1");
-      assertOnAllCachesAndOwnership(k2, "value2");
+      checkOwnership(k1, k2, "value1", "value2");
+   }
 
-      assertIsInL1(c4, k1);
-      assertIsInL1(c4, k2);
-      assertIsInL1(c1, k2);
-      assertIsInL1(c3, k1);
+   public void testPutIfAbsentFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      Object ret = c4.putIfAbsent(k1, "new_value");
+      assert "value1".equals(ret) : "Was expecting value1 but was " + ret;
+      ret = c4.putIfAbsent(k2, "new_value");
+      assert "value2".equals(ret) : "Was expecting value2 but was " + ret;
+
+      assert c4.get(k1).equals("value1");
+      assert c4.get(k2).equals("value2");
+
+      tm4.rollback();
+
+      assert c2.get(k1).equals("value1");
+      assert c2.get(k2).equals("value2");
+
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
+
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
    }
 
-   public void testReturnValuesInTx() throws Exception {
-      // TODO
+   public void testRemoveFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      Object ret = c4.remove(k1);
+      assert "value1".equals(ret);
+      ret = c4.remove(k2);
+      assert "value2".equals(ret);
+
+      assert !c4.containsKey(k1);
+      assert !c4.containsKey(k2);
+      tm4.rollback();
+
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
+
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
    }
+
+   public void testConditionalRemoveFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      boolean ret = c4.remove(k1, "valueX");
+      assert !ret;
+      ret = c4.remove(k2, "valueX");
+      assert !ret;
+
+      assert c4.containsKey(k1);
+      assert c4.containsKey(k2);
+
+      ret = c4.remove(k1, "value1");
+      assert ret;
+      ret = c4.remove(k2, "value2");
+      assert ret;
+
+      assert !c4.containsKey(k1);
+      assert !c4.containsKey(k2);
+      tm4.rollback();
+
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
+
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
+   }
+
+   public void testReplaceFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      Object ret = c4.replace(k1, "new_value");
+      assert "value1".equals(ret);
+      ret = c4.replace(k2, "new_value");
+      assert "value2".equals(ret);
+
+      assert "new_value".equals(c4.get(k1));
+      assert "new_value".equals(c4.get(k2));
+      tm4.rollback();
+
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
+
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
+   }
+
+   public void testConditionalReplaceFromNonOwner() throws Exception {
+      // we need 2 keys that reside on different caches...
+      MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+      MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+      init(k1, k2);
+
+      TransactionManager tm4 = getTransactionManager(c4);
+      tm4.begin();
+      boolean ret = c4.replace(k1, "valueX", "new_value");
+      assert !ret;
+      ret = c4.replace(k2, "valueX", "new_value");
+      assert !ret;
+
+      assert "value1".equals(c4.get(k1));
+      assert "value2".equals(c4.get(k2));
+
+      ret = c4.replace(k1, "value1", "new_value");
+      assert ret;
+      ret = c4.replace(k2, "value2", "new_value");
+      assert ret;
+
+      assert "new_value".equals(c4.get(k1));
+      assert "new_value".equals(c4.get(k2));
+      tm4.rollback();
+
+      assertIsInContainerImmortal(c1, k1);
+      assertIsInContainerImmortal(c2, k1);
+      assertIsInContainerImmortal(c2, k2);
+      assertIsInContainerImmortal(c3, k2);
+
+      assertIsNotInL1(c4, k1);
+      assertIsNotInL1(c4, k2);
+      assertIsNotInL1(c1, k2);
+      assertIsNotInL1(c3, k1);
+
+      checkOwnership(k1, k2, "value1", "value2");
+   }
 }

Added: trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,33 @@
+package org.infinispan.distribution;
+
+import org.testng.annotations.Test;
+
+
+ at Test(groups = "unit", testName = "distribution.MagicKeyTest")
+public class MagicKeyTest extends BaseDistFunctionalTest {
+   public void testMagicKeys() {
+      BaseDistFunctionalTest.MagicKey k1 = new BaseDistFunctionalTest.MagicKey(c1);
+      assert getDistributionManager(c1).isLocal(k1);
+      assert getDistributionManager(c2).isLocal(k1);
+      assert !getDistributionManager(c3).isLocal(k1);
+      assert !getDistributionManager(c4).isLocal(k1);
+
+      BaseDistFunctionalTest.MagicKey k2 = new BaseDistFunctionalTest.MagicKey(c2);
+      assert !getDistributionManager(c1).isLocal(k2);
+      assert getDistributionManager(c2).isLocal(k2);
+      assert getDistributionManager(c3).isLocal(k2);
+      assert !getDistributionManager(c4).isLocal(k2);
+
+      BaseDistFunctionalTest.MagicKey k3 = new BaseDistFunctionalTest.MagicKey(c3);
+      assert !getDistributionManager(c1).isLocal(k3);
+      assert !getDistributionManager(c2).isLocal(k3);
+      assert getDistributionManager(c3).isLocal(k3);
+      assert getDistributionManager(c4).isLocal(k3);
+
+      BaseDistFunctionalTest.MagicKey k4 = new BaseDistFunctionalTest.MagicKey(c4);
+      assert getDistributionManager(c1).isLocal(k4);
+      assert !getDistributionManager(c2).isLocal(k4);
+      assert !getDistributionManager(c3).isLocal(k4);
+      assert getDistributionManager(c4).isLocal(k4);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java	2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java	2009-04-24 18:15:21 UTC (rev 169)
@@ -202,7 +202,7 @@
 
       // EvictCommand does not have an empty constructor, so doesn't look to be one that is marshallable.
 
-      InvalidateCommand c7 = new InvalidateCommand(null, "key1", "key2");
+      InvalidateCommand c7 = new InvalidateCommand(null, null, "key1", "key2");
       bytes = marshaller.objectToByteBuffer(c7);
       InvalidateCommand rc7 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
       assert rc7.getCommandId() == c7.getCommandId() : "Writen[" + c7.getCommandId() + "] and read[" + rc7.getCommandId() + "] objects should be the same";




More information about the infinispan-commits mailing list