[infinispan-commits] Infinispan SVN: r709 - in trunk/core/src: main/java/org/infinispan/distribution and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Aug 20 18:27:14 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-08-20 18:27:13 -0400 (Thu, 20 Aug 2009)
New Revision: 709

Added:
   trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
   trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
Removed:
   trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java
   trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java
   trunk/core/src/test/java/org/infinispan/util/concurrent/
Modified:
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
   trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
   trunk/core/src/main/java/org/infinispan/util/Util.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
Log:
[ISPN-65] (Rehashing) feature complete.  Needs more tests now.

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -105,7 +105,7 @@
       if (cacheStore != null) {
          for (InternalCacheEntry ice : cacheStore.loadAll()) {
             Object k = ice.getKey();
-            if (shouldAddToMap(k, oldCH, numCopies, self) && !state.containsKey(k))
+            if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self))
                state.put(k, ice.toInternalCacheValue());
          }
       }
@@ -124,7 +124,8 @@
    }
 
    public Object pushState() {
-      throw new RuntimeException("implement me");
+      distributionManager.applyReceivedState(state);
+      return null;
    }
 
    public byte getCommandId() {

Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -21,14 +21,14 @@
     *
     * @param caches caches in cluster.
     */
-   void setCaches(Collection<Address> caches);
+   void setCaches(List<Address> caches);
 
    /**
     * Should return a collection of cache addresses in the cluster.
     *
     * @return collection of cache addresses
     */
-   Collection<Address> getCaches();
+   List<Address> getCaches();
 
    /**
     * Locates a key, given a replication count (number of copies).
@@ -36,7 +36,7 @@
     * @param key       key to locate
     * @param replCount replication count (number of copies)
     * @return a list of addresses where the key resides, where this list is a subset of the addresses set in {@link
-    *         #setCaches(java.util.Collection)}.  Should never be null, and should contain replCount elements or the max
+    *         #setCaches(java.util.List)}.  Should never be null, and should contain replCount elements or the max
     *         number of caches available, whichever is smaller.
     */
    List<Address> locate(Object key, int replCount);
@@ -53,12 +53,23 @@
    Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount);
 
    /**
-    * Tests whether a group of addresses are in the same subspace of the hash space.  Addresses are in the same subspace
-    * if an arbitrary key mapped to one address could also be mapped to the other.
+    * Calculates the logical distance between two addresses.  This distance is based on where the addresses lie in the
+    * hash space.
     *
     * @param a1 address to test
     * @param a2 address to test
-    * @return true of the two addresses are in the same subspace, false otherwise.
+    * @return the distance between the 2 nodes.  Always a positive number, where the distance between a1 and itself is
+    *         0. The distance between a1 and the next adjacent node is 1 and teh distance between a1 and the previous
+    *         adjacent node is caches.size() - 1.
     */
-   boolean isInSameSubspace(Address a1, Address a2);
+   int getDistance(Address a1, Address a2);
+
+   /**
+    * Tests whether two addresses are logically next to each other in the hash space.
+    *
+    * @param a1 address to test
+    * @param a2 address to test
+    * @return true if adjacent, false if not
+    */
+   boolean isAdjacent(Address a1, Address a2);
 }

Added: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,63 @@
+package org.infinispan.distribution;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * // TODO: Manik: Document this
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class ConsistentHashHelper {
+
+   /**
+    * Returns a new consistent hash of the same type with the given address removed.
+    *
+    * @param ch       consistent hash to start with
+    * @param toRemove address to remove
+    * @param c        configuration
+    * @return a new consistent hash instance of the same type
+    */
+   public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+      if (ch instanceof UnionConsistentHash)
+         return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c);
+      else {
+         ConsistentHash newCH = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
+         List<Address> caches = ch.getCaches();
+         caches.remove(toRemove);
+         newCH.setCaches(caches);
+         return newCH;
+      }
+   }
+
+   public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+      ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c);
+      ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c);
+      return new UnionConsistentHash(newFirstCH, newSecondCH);
+   }
+
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+      ConsistentHash ch = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
+      ch.setCaches(addresses);
+      return ch;
+   }
+
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Address... moreAddresses) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+      List<Address> list = new LinkedList<Address>(addresses);
+      list.addAll(Arrays.asList(moreAddresses));
+      return createConsistentHash(c, list);
+   }
+
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+      List<Address> list = new LinkedList<Address>(addresses);
+      list.addAll(moreAddresses);
+      return createConsistentHash(c, list);
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -7,8 +7,8 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import static java.lang.Math.min;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -23,7 +23,7 @@
    final static int HASH_SPACE = 10240; // no more than 10k nodes?
 
 
-   public void setCaches(Collection<Address> caches) {
+   public void setCaches(List<Address> caches) {
 
       addresses = new ArrayList<Address>(caches);
 
@@ -38,16 +38,19 @@
          while (positions.containsKey(positionIndex)) positionIndex = positionIndex + 1 % HASH_SPACE;
          positions.put(positionIndex, a);
       }
+
+      addresses.clear();
+      for (Address a : positions.values()) addresses.add(a);
    }
 
-   public Collection<Address> getCaches() {
+   public List<Address> getCaches() {
       return addresses;
    }
 
    public List<Address> locate(Object key, int replCount) {
       int hash = Math.abs(key.hashCode());
       int clusterSize = addresses.size();
-      int numCopiesToFind = Math.min(replCount, clusterSize);
+      int numCopiesToFind = min(replCount, clusterSize);
 
       List<Address> owners = new ArrayList<Address>(numCopiesToFind);
 
@@ -72,29 +75,28 @@
       return owners;
    }
 
-//   public List<Address> locate(Object key, int replicationCount) {
-//      int hash = Math.abs(key.hashCode());
-//      int clusterSize = addresses.size();
-//      int numCopiesToFind = Math.min(replicationCount, clusterSize);
-//
-//      List<Address> results = new ArrayList<Address>(numCopiesToFind);
-//
-//      int copyNumber = 0;
-//
-//      while (results.size() < numCopiesToFind) {
-//         // we mod the index the 2nd time to make sure the index starts again from the beginning when it reaches the end.
-//         // e.g., in a cluster of 10 with 3 copies of data, and a key that maps to node index 9, the next 2 backups should
-//         // be at indexes 0 and 1.
-//
-//         int index = ((hash % clusterSize) + copyNumber) % clusterSize;
-//         Address candidate = addresses.get(index);
-//         results.add(candidate);
-//         copyNumber++;
-//      }
-//
-//      return results;
-//   }
+   public int getDistance(Address a1, Address a2) {
+      if (a1 == null || a2 == null) throw new NullPointerException("Cannot deal with nulls as parameters!");
 
+      int p1 = addresses.indexOf(a1);
+      if (p1 < 0)
+         throw new IllegalArgumentException("Address " + a1 + " not in the addresses list of this consistent hash impl!");
+
+      int p2 = addresses.indexOf(a2);
+      if (p2 < 0)
+         throw new IllegalArgumentException("Address " + a2 + " not in the addresses list of this consistent hash impl!");
+
+      if (p1 <= p2)
+         return p2 - p1;
+      else
+         return addresses.size() - (p1 - p2);
+   }
+
+   public boolean isAdjacent(Address a1, Address a2) {
+      int distance = getDistance(a1, a2);
+      return distance == 1 || distance == addresses.size() - 1;
+   }
+
    @Override
    public boolean equals(Object o) {
       if (this == o) return true;
@@ -137,8 +139,4 @@
             "addresses (in order of hash space position)=" + positions.values() +
             '}';
    }
-
-   public boolean isInSameSubspace(Address a1, Address a2) {
-      throw new UnsupportedOperationException("TODO Implement me!");
-   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -2,6 +2,7 @@
 
 import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
 import org.infinispan.loaders.CacheStore;
@@ -101,5 +102,9 @@
     * @return a cache store is one is available and configured for use in rehashing, or null otherwise.
     */
    CacheStore getCacheStoreForRehashing();
+
+   boolean isRehashInProgress();
+
+   void applyReceivedState(Map<Object, InternalCacheValue> state);
 }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -3,12 +3,17 @@
 import org.infinispan.CacheException;
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
 import org.infinispan.context.InvocationContextContainer;
+import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
@@ -35,7 +40,9 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -53,7 +60,8 @@
    private final Log log = LogFactory.getLog(DistributionManagerImpl.class);
    private final boolean trace = log.isTraceEnabled();
    Configuration configuration;
-   ConsistentHash consistentHash;
+   volatile ConsistentHash consistentHash, oldConsistentHash;
+   Address self;
    CacheLoaderManager cacheLoaderManager;
    RpcManager rpcManager;
    CacheManagerNotifier notifier;
@@ -82,8 +90,9 @@
    private InvocationContextContainer icc;
    private volatile boolean joinTaskSubmitted = false;
    volatile boolean joinComplete = false;
+   final List<Address> leavers = new CopyOnWriteArrayList<Address>();
+   volatile Future<Void> leaveTaskFuture;
 
-
    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
                     DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
@@ -102,12 +111,12 @@
    @Start(priority = 20)
    public void start() throws Exception {
       replCount = configuration.getNumOwners();
-      consistentHash = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
-      consistentHash.setCaches(rpcManager.getTransport().getMembers());
+      consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
+      self = rpcManager.getTransport().getAddress();
       listener = new ViewChangeListener();
       notifier.addListener(listener);
       if (rpcManager.getTransport().getMembers().size() > 1) {
-         JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, interceptorChain, icc, this);
+         JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
          rehashExecutor.submit(joinTask);
       } else {
          joinComplete = true;
@@ -122,14 +131,21 @@
       joinComplete = false;
    }
 
-   private Address diff(List<Address> newList, List<Address> oldList) {
-      List<Address> list = new ArrayList<Address>(newList);
-      list.removeAll(oldList);
-      // Could easily be > 1 member joined!
-      return list.size() > 0 ? list.get(0) : null;
+   final List<Address> diffAll(List<Address> l1, List<Address> l2) {
+      List<Address> largerList = l1.size() > l2.size() ? l1 : l2;
+      List<Address> smallerList = largerList == l1 ? l2 : l1;
+
+      List<Address> list = new ArrayList<Address>(largerList);
+      list.removeAll(smallerList);
+      return list;
    }
 
+   final Address diff(List<Address> l1, List<Address> l2) {
+      List<Address> l = diffAll(l1, l2);
+      return l.isEmpty() ? null : l.get(0);
+   }
 
+
    public void rehash(List<Address> newMembers, List<Address> oldMembers) {
       boolean join = oldMembers == null || oldMembers.size() < newMembers.size();
       // on view change, we should update our view
@@ -139,18 +155,51 @@
          Address joiner = diff(newMembers, oldMembers);
          log.info("This is a JOIN event!  Wait for notification from new joiner " + joiner);
       } else {
-         log.info("This is a LEAVE event!");
-         // TODO: implement this stuff!
-         ConsistentHash newCH = new DefaultConsistentHash();
-         newCH.setCaches(newMembers);
+         Address leaver = diff(newMembers, oldMembers);
+         log.info("This is a LEAVE event!  Node {0} has just left", leaver);
 
-         consistentHash = newCH;
+         boolean willReceiveLeaverState = willReceiveLeaverState(leaver);
+         boolean willSendLeaverState = willSendLeaverState(leaver);
+         try {
+            oldConsistentHash = consistentHash;
+            consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration);
+         } catch (Exception e) {
+            log.fatal("Unable to process leaver!!", e);
+            throw new CacheException(e);
+         }
 
+         if (willReceiveLeaverState) {
+            log.info("Starting transaction logging; expecting state from someone!");
+            transactionLogger.enable();
+         }
+
+         if (willSendLeaverState) {
+            if (leaveTaskFuture != null && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
+               leaveTaskFuture.cancel(true);
+            }
+
+            leavers.add(leaver);
+            LeaveTask task = new LeaveTask(this, rpcManager, configuration, leavers, transactionLogger, cf, dataContainer);
+            leaveTaskFuture = rehashExecutor.submit(task);
+
+            log.info("Need to rehash");
+         } else {
+            log.info("Not in same subspace, so ignoring leave event");
+         }
       }
    }
 
+   boolean willSendLeaverState(Address leaver) {
+      return consistentHash.isAdjacent(leaver, self);
+   }
+
+   boolean willReceiveLeaverState(Address leaver) {
+      int dist = consistentHash.getDistance(leaver, self);
+      return dist <= replCount;
+   }
+
    public boolean isLocal(Object key) {
-      return consistentHash.locate(key, replCount).contains(rpcManager.getTransport().getAddress());
+      return consistentHash.locate(key, replCount).contains(self);
    }
 
    public List<Address> locate(Object key) {
@@ -195,8 +244,8 @@
    }
 
    public boolean isAffectedByRehash(Object key) {
-      if (rehashInProgress) {
-         throw new UnsupportedOperationException("TODO implement me");
+      if (transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, replCount).contains(self)) {
+         return true;
       } else {
          return false;
       }
@@ -248,6 +297,18 @@
       log.trace("New CH is {0}", consistentHash);
    }
 
+   public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
+      for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
+         if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
+            InternalCacheValue v = e.getValue();
+            PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle());
+            InvocationContext ctx = icc.createInvocationContext();
+            ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP);
+            interceptorChain.invoke(ctx, put);
+         }
+      }
+   }
+
    @Listener
    public class ViewChangeListener {
       @ViewChanged
@@ -262,4 +323,40 @@
          return null;
       return cacheLoaderManager.getCacheStore();
    }
+
+   public boolean isRehashInProgress() {
+      return !leavers.isEmpty() || rehashInProgress;
+   }
+
+   public void applyReceivedState(Map<Object, InternalCacheValue> state) {
+      applyState(consistentHash, state);
+      boolean unlocked = false;
+      try {
+         drainTransactionLog();
+         unlocked = true;
+      } finally {
+         if (!unlocked) transactionLogger.unlockAndDisable();
+      }
+   }
+
+   void drainTransactionLog() {
+      List<WriteCommand> c;
+      while (transactionLogger.size() > 10) {
+         c = transactionLogger.drain();
+         apply(c);
+      }
+
+      c = transactionLogger.drainAndLock();
+      apply(c);
+
+      transactionLogger.unlockAndDisable();
+   }
+
+   private void apply(List<WriteCommand> c) {
+      for (WriteCommand cmd : c) {
+         InvocationContext ctx = icc.createInvocationContext();
+         ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP);
+         interceptorChain.invoke(ctx, cmd);
+      }
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -4,37 +4,23 @@
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.control.RehashControlCommand;
 import static org.infinispan.commands.control.RehashControlCommand.Type.*;
-import org.infinispan.commands.write.InvalidateCommand;
-import org.infinispan.commands.write.PutKeyValueCommand;
-import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.Flag;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.context.InvocationContextContainer;
-import org.infinispan.interceptors.InterceptorChain;
+import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.responses.SuccessfulResponse;
 import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.NotifyingFutureImpl;
-import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Future;
 
 /**
  * 5.  JoinTask: This is a PULL based rehash.  JoinTask is kicked off on the JOINER. 5.1.  Obtain OLD_CH from
@@ -54,23 +40,12 @@
    private static final Log log = LogFactory.getLog(JoinTask.class);
    ConsistentHash chOld;
    ConsistentHash chNew;
-   //   ConsistentHash chTemp;
-   CommandsFactory commandsFactory;
-   TransactionLogger transactionLogger;
-   DataContainer dataContainer;
-   InterceptorChain interceptorChain;
-   InvocationContextContainer icc;
    Address self;
 
    public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
-                   TransactionLogger transactionLogger, DataContainer dataContainer, InterceptorChain interceptorChain,
-                   InvocationContextContainer icc, DistributionManagerImpl dmi) {
-      super(dmi, rpcManager, conf);
-      this.commandsFactory = commandsFactory;
-      this.transactionLogger = transactionLogger;
+                   TransactionLogger transactionLogger, DataContainer dataContainer, DistributionManagerImpl dmi) {
+      super(dmi, rpcManager, conf, transactionLogger, commandsFactory, dataContainer);
       this.dataContainer = dataContainer;
-      this.interceptorChain = interceptorChain;
-      this.icc = icc;
       self = rpcManager.getTransport().getAddress();
    }
 
@@ -90,7 +65,8 @@
    }
 
    protected void performRehash() throws Exception {
-      log.trace("Starting rehash on new joiner");
+      long start = System.currentTimeMillis();
+      if (log.isDebugEnabled()) log.debug("Commencing");
       boolean unlocked = false;
       try {
          dmi.joinComplete = false;
@@ -98,24 +74,24 @@
          // this happens in a loop to ensure we receive the correct CH and not a "union".
          // TODO make at least *some* of these configurable!
          long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
-         int maxWaitTime = 600000; // after which we give up!
+         int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
          Random rand = new Random();
          long giveupTime = System.currentTimeMillis() + maxWaitTime;
          do {
-            log.trace("Requesting old consistent hash from coordinator");
+            if (log.isTraceEnabled()) log.trace("Requesting old consistent hash from coordinator");
             List<Response> resp = rpcManager.invokeRemotely(coordinator(),
-                                                            commandsFactory.buildRehashControlCommand(JOIN_REQ, self),
-                                                            SYNCHRONOUS, 100000, true);
+                                                            cf.buildRehashControlCommand(JOIN_REQ, self),
+                                                            SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
             List<Address> addresses = parseResponses(resp);
 
-            log.trace("Retrieved old consistent hash address list {0}", addresses);
+            if (log.isDebugEnabled()) log.debug("Retrieved old consistent hash address list {0}", addresses);
             if (addresses == null) {
                long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
                time = (time * 10) + minSleepTime;
-               log.debug("Sleeping for {0}", Util.prettyPrintTime(time));
+               if (log.isTraceEnabled()) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
                Thread.sleep(time); // sleep for a while and retry
             } else {
-               chOld = createConsistentHash(addresses);
+               chOld = createConsistentHash(configuration, addresses);
             }
          } while (chOld == null && System.currentTimeMillis() < giveupTime);
 
@@ -123,109 +99,66 @@
             throw new CacheException("Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
 
          // 2.  new CH instance
-         chNew = createConsistentHash(chOld.getCaches(), self);
+         chNew = createConsistentHash(configuration, chOld.getCaches(), self);
          dmi.setConsistentHash(chNew);
 
          // 3.  Enable TX logging
          transactionLogger.enable();
 
          // 4.  Broadcast new temp CH
-         rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+         rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
 
          // 5.  txLogger being enabled will cause CLusteredGetCommands to return uncertain responses.
 
          // 6.  pull state from everyone.
          Address myAddress = rpcManager.getTransport().getAddress();
-         RehashControlCommand cmd = commandsFactory.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
+         RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
          // TODO I should be able to process state chunks from different nodes simultaneously!!
-         // TODO I should only send this pull state request to nodes which I know will send me state.  Not everyone in chOld!!
-         List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, SYNCHRONOUS, 10000, true);
+         List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff();
+         List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
 
          // 7.  Apply state
          for (Response r : resps) {
             if (r instanceof SuccessfulResponse) {
                Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
-               for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
-                  if (chNew.locate(e.getKey(), configuration.getNumOwners()).contains(myAddress)) {
-                     InternalCacheValue v = e.getValue();
-                     PutKeyValueCommand put = commandsFactory.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle());
-                     InvocationContext ctx = icc.createInvocationContext();
-                     ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP);
-                     interceptorChain.invoke(ctx, put);
-                  }
-               }
+               dmi.applyState(chNew, state);
             }
          }
 
          // 8.  Drain logs
-
-         List<WriteCommand> c = null;
-         while (transactionLogger.size() > 10) {
-            c = transactionLogger.drain();
-            apply(c);
-         }
-
-         c = transactionLogger.drainAndLock();
-         apply(c);
-
+         dmi.drainTransactionLog();
          unlocked = true;
-         // 9.
-         transactionLogger.unlockAndDisable();
 
          // 10.
-         // TODO this phase should also "tell" the coord that the join is complete so that other waiting joiners
-         // may proceed.  Ideally another command, directed to the coord.
-         rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
-         rpcManager.invokeRemotely(coordinator(), commandsFactory.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
-                                   100000, true);
+         rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+         rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
+                                   configuration.getRehashRpcTimeout(), true);
 
          // 11.
-         Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
-         for (Object key : dataContainer.keySet()) {
-            Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
-            for (Address a : invalidHolders) {
-               Set<Object> s = invalidations.get(a);
-               if (s == null) {
-                  s = new HashSet<Object>();
-                  invalidations.put(a, s);
-               }
-               s.add(key);
-            }
-         }
+         invalidateInvalidHolders(chOld, chNew);
 
-         Set<Future> futures = new HashSet<Future>();
-
-         for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
-            InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(e.getValue().toArray());
-            NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
-            rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
-            futures.add(f);
-         }
-
-         for (Future f : futures) f.get();
+         if (log.isInfoEnabled())
+            log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
       } catch (Exception e) {
-         log.warn("Caught error performing rehash!", e);
+         log.error("Caught exception!", e);
       } finally {
          if (!unlocked) transactionLogger.unlockAndDisable();
          dmi.joinComplete = true;
       }
    }
 
-   private Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
-      List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());
-      List<Address> newOwners = chNew.locate(key, configuration.getNumOwners());
-
-      List<Address> toInvalidate = new LinkedList<Address>(oldOwners);
-      toInvalidate.removeAll(newOwners);
-
-      return toInvalidate;
-   }
-
-   private void apply(List<WriteCommand> c) {
-      for (WriteCommand cmd : c) {
-         InvocationContext ctx = icc.createInvocationContext();
-         ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP);
-         interceptorChain.invoke(ctx, cmd);
+   // TODO unit test this!!!
+   private List<Address> getAddressesWhoMaySendStuff() {
+      List<Address> caches = chNew.getCaches();
+      int selfIdx = caches.indexOf(self);
+      int replCount = configuration.getNumOwners();
+      if (selfIdx >= replCount - 1) {
+         return caches.subList(selfIdx - replCount + 1, selfIdx);
+      } else {
+         List<Address> l = new LinkedList<Address>(caches.subList(0, selfIdx));
+         int alreadyCollected = l.size();
+         l.addAll(caches.subList(caches.size() - replCount + 1 + alreadyCollected, caches.size()));
+         return l;
       }
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -1,20 +1,165 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.CacheStore;
 import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
 /**
- * // TODO: Manik: Document this
+ * A task to handle rehashing for when a node leaves the cluster
  *
  * @author Manik Surtani
  * @since 4.0
  */
 public class LeaveTask extends RehashTask {
-   protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
-      super(dmi, rpcManager, configuration);
+   private static final Log log = LogFactory.getLog(LeaveTask.class);
+
+   private final List<Address> leavers;
+   private final Address self;
+   private final List<Address> leaversHandled;
+
+
+   protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration, List<Address> leavers,
+                       TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
+      super(dmi, rpcManager, configuration, transactionLogger, cf, dataContainer);
+      this.leavers = leavers;
+      this.leaversHandled = new LinkedList<Address>(leavers);
+      this.self = rpcManager.getTransport().getAddress();
    }
 
    protected void performRehash() throws Exception {
-      // TODO: Customise this generated block
+      long start = System.currentTimeMillis();
+      if (log.isDebugEnabled()) log.debug("Commencing.  Leavers' list is {0}", leavers);
+      boolean completedSuccessfully = false;
+      List<Address> leaversHandled = new LinkedList<Address>(leavers);
+      ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+      int replCount = configuration.getNumOwners();
+      try {
+         StateMap statemap = new StateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
+         if (log.isTraceEnabled()) log.trace("Examining state in data container");
+         // need to constantly detect whether we are interrupted.  If so, abort accordingly.
+         for (InternalCacheEntry ice : dataContainer) {
+            List<Address> oldOwners = oldCH.locate(ice.getKey(), replCount);
+            for (Address a : oldOwners) if (leaversHandled.contains(a)) statemap.addState(ice);
+         }
+
+         CacheStore cs = dmi.getCacheStoreForRehashing();
+         if (cs != null) {
+            if (log.isTraceEnabled()) log.trace("Examining state in cache store");
+            for (InternalCacheEntry ice : cs.loadAll()) if (!statemap.containsKey(ice.getKey())) statemap.addState(ice);
+         }
+
+         // push state.
+         Set<Future<Void>> pushFutures = new HashSet<Future<Void>>();
+         for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
+            if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
+            RehashControlCommand push = cf.buildRehashControlCommand(RehashControlCommand.Type.PUSH_STATE, self, entry.getValue());
+            NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+            pushFutures.add(f);
+            rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+         }
+
+         for (Future f : pushFutures) f.get();
+
+         completedSuccessfully = true;
+         invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
+         if (log.isInfoEnabled())
+            log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
+      } catch (InterruptedException ie) {
+         if (log.isInfoEnabled())
+            log.info("Interrupted after {0}!  Completed successfully? {1}", Util.prettyPrintTime(System.currentTimeMillis() - start), completedSuccessfully);
+      } catch (Exception e) {
+         log.error("Caught exception! Completed successfully? {0}", e, completedSuccessfully);
+      }
+      finally {
+         if (completedSuccessfully) leavers.removeAll(leaversHandled);
+      }
    }
+
+   @Override
+   protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
+      Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
+      l.removeAll(leaversHandled);
+      return l;
+   }
 }
+
+class StateMap {
+   List<Address> leavers;
+   Address self;
+   ConsistentHash oldCH, newCH;
+   int replCount;
+   Set<Object> keysHandled = new HashSet<Object>();
+   Map<Address, Map<Object, InternalCacheValue>> state = new HashMap<Address, Map<Object, InternalCacheValue>>();
+
+   StateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      this.leavers = leavers;
+      this.self = self;
+      this.oldCH = oldCH;
+      this.newCH = newCH;
+      this.replCount = replCount;
+   }
+
+   /**
+    * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+    * owner list
+    *
+    * @param ice
+    */
+   void addState(InternalCacheEntry ice) {
+      for (Address leaver : leavers) {
+         List<Address> owners = oldCH.locate(ice.getKey(), replCount);
+         int leaverIndex = owners.indexOf(leaver);
+         if (leaverIndex > -1) {
+            int numOwners = owners.size();
+            int selfIndex = owners.indexOf(self);
+            boolean isLeaverLast = leaverIndex == numOwners - 1;
+            if ((isLeaverLast && selfIndex == numOwners - 2) ||
+                  (!isLeaverLast && selfIndex == leaverIndex + 1)) {
+               // add to state map!
+               List<Address> newOwners = newCH.locate(ice.getKey(), replCount);
+               newOwners.removeAll(owners);
+               if (!newOwners.isEmpty()) {
+                  for (Address no : newOwners) {
+                     Map<Object, InternalCacheValue> s = state.get(no);
+                     if (s == null) {
+                        s = new HashMap<Object, InternalCacheValue>();
+                        state.put(no, s);
+                     }
+                     s.put(ice.getKey(), ice.toInternalCacheValue());
+                  }
+               }
+            }
+         }
+      }
+      keysHandled.add(ice.getKey());
+   }
+
+   Map<Address, Map<Object, InternalCacheValue>> getState() {
+      return state;
+   }
+
+   boolean containsKey(Object key) {
+      return keysHandled.contains(key);
+   }
+}

Deleted: trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.remoting.rpc.RpcManager;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class RecvLeaveTask extends LeaveTask {
-   protected RecvLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
-      super(dmi, rpcManager, configuration);
-   }
-}

Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +1,25 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.write.InvalidateCommand;
 import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * // TODO: Manik: Document this
@@ -23,11 +32,18 @@
    DistributionManagerImpl dmi;
    RpcManager rpcManager;
    Configuration configuration;
+   TransactionLogger transactionLogger;
+   CommandsFactory cf;
+   DataContainer dataContainer;
 
-   protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
+   protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration,
+                        TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
       this.dmi = dmi;
       this.rpcManager = rpcManager;
       this.configuration = configuration;
+      this.transactionLogger = transactionLogger;
+      this.cf = cf;
+      this.dataContainer = dataContainer;
    }
 
    public Void call() throws Exception {
@@ -46,15 +62,39 @@
       return Collections.singleton(rpcManager.getTransport().getCoordinator());
    }
 
-   protected ConsistentHash createConsistentHash(Collection<Address> addresses) throws Exception {
-      ConsistentHash ch = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
-      ch.setCaches(addresses);
-      return ch;
+   protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+      Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
+      for (Object key : dataContainer.keySet()) {
+         Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
+         for (Address a : invalidHolders) {
+            Set<Object> s = invalidations.get(a);
+            if (s == null) {
+               s = new HashSet<Object>();
+               invalidations.put(a, s);
+            }
+            s.add(key);
+         }
+      }
+
+      Set<Future> futures = new HashSet<Future>();
+
+      for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
+         InvalidateCommand ic = cf.buildInvalidateFromL1Command(e.getValue().toArray());
+         NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+         rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
+         futures.add(f);
+      }
+
+      for (Future f : futures) f.get();
    }
 
-   protected ConsistentHash createConsistentHash(Collection<Address> addresses, Address... moreAddresses) throws Exception {
-      List<Address> list = new LinkedList<Address>(addresses);
-      list.addAll(Arrays.asList(moreAddresses));
-      return createConsistentHash(list);
+   protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
+      List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());
+      List<Address> newOwners = chNew.locate(key, configuration.getNumOwners());
+
+      List<Address> toInvalidate = new LinkedList<Address>(oldOwners);
+      toInvalidate.removeAll(newOwners);
+
+      return toInvalidate;
    }
 }

Deleted: trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.remoting.rpc.RpcManager;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class SendLeaveTask extends LeaveTask {
-   protected SendLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
-      super(dmi, rpcManager, configuration);
-   }
-}

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -56,4 +56,6 @@
    boolean logIfNeeded(Collection<WriteCommand> commands);
 
    int size();
+
+   boolean isEnabled();
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -72,4 +72,8 @@
    public int size() {
       return enabled ? 0 : commandQueue.size();
    }
+
+   public boolean isEnabled() {
+      return enabled;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -9,7 +9,6 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -34,11 +33,11 @@
       this.newCH = newCH;
    }
 
-   public void setCaches(Collection<Address> caches) {
+   public void setCaches(List<Address> caches) {
       // no op
    }
 
-   public Collection<Address> getCaches() {
+   public List<Address> getCaches() {
       return Collections.emptyList();
    }
 
@@ -49,6 +48,14 @@
       return Immutables.immutableListConvert(addresses);
    }
 
+   public int getDistance(Address a1, Address a2) {
+      throw new UnsupportedOperationException("Unsupported!");
+   }
+
+   public boolean isAdjacent(Address a1, Address a2) {
+      throw new UnsupportedOperationException("Unsupported!");
+   }
+
    public ConsistentHash getNewConsistentHash() {
       return newCH;
    }
@@ -69,8 +76,4 @@
          return new UnionConsistentHash((ConsistentHash) input.readObject(), (ConsistentHash) input.readObject());
       }
    }
-
-   public boolean isInSameSubspace(Address a1, Address a2) {
-      throw new UnsupportedOperationException("Not supported by this impl");
-   }
 }

Modified: trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -74,7 +74,7 @@
    public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
       try {
          if (!notifyBeforeCompletion()) {
-            log.trace("Not running 2PC as Synchronization.before not successfull");
+            log.trace("Not running 2PC as Synchronization.before not successful");
             return;
          }
 
@@ -310,7 +310,7 @@
          try {
             res.rollback(xid);
          } catch (XAException e) {
-            log.warn("Error while rolling back",e);
+            log.warn("Error while rolling back", e);
          }
       }
    }
@@ -322,7 +322,7 @@
          try {
             res.commit(xid, false);//todo we only support one phase commit for now, change this!!!
          } catch (XAException e) {
-            log.warn("exception while committing",e);
+            log.warn("exception while committing", e);
             throw new HeuristicMixedException(e.getMessage());
          }
       }

Modified: trunk/core/src/main/java/org/infinispan/util/Util.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/Util.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/util/Util.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -55,7 +55,7 @@
    }
 
    @SuppressWarnings("unchecked")
-   public static <T> T getInstance(Class<T> clazz) throws Exception {
+   public static <T> T getInstance(Class<T> clazz) throws IllegalAccessException, InstantiationException {
       // first look for a getInstance() constructor
       T instance;
       try {
@@ -70,7 +70,7 @@
    }
 
    @SuppressWarnings("unchecked")
-   public static Object getInstance(String classname) throws Exception {
+   public static Object getInstance(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
       if (classname == null) throw new IllegalArgumentException("Cannot load null class!");
       Class clazz = loadClass(classname);
       return getInstance(clazz);

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -290,7 +290,7 @@
     * A special type of key that if passed a cache in its constructor, will ensure it will always be assigned to that
     * cache (plus however many additional caches in the hash space)
     */
-   protected static class MagicKey implements Serializable {
+   public static class MagicKey implements Serializable {
       String name = null;
       int hashcode;
       String address;

Modified: trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -74,6 +74,27 @@
          assert locations.get(k).size() == 3;
       }
    }
+
+   public void testDistances() {
+      Address a1 = new TestAddress(1);
+      Address a2 = new TestAddress(2);
+      Address a3 = new TestAddress(3);
+      Address a4 = new TestAddress(4);
+
+      ConsistentHash ch = new DefaultConsistentHash();
+      ch.setCaches(Arrays.asList(a1, a2, a3, a4));
+
+      assert ch.getDistance(a1, a1) == 0;
+      assert ch.getDistance(a1, a4) == 3;
+      assert ch.getDistance(a1, a3) == 2;
+      assert ch.getDistance(a3, a1) == 2;
+      assert ch.getDistance(a1, a2) == 1;
+      assert ch.getDistance(a2, a1) == 3;
+
+      assert ch.isAdjacent(a1, a2);
+      assert !ch.isAdjacent(a1, a3);
+      assert ch.isAdjacent(a1, a4);
+   }
 }
 
 class TestAddress implements Address {

Added: trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,46 @@
+package org.infinispan.distribution;
+
+import org.easymock.EasyMock;
+import org.infinispan.remoting.transport.Address;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+/**
+ * Tests helper functions on the DistManager
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "unit", testName = "distribution.DistributionManagerUnitTest")
+public class DistributionManagerUnitTest {
+   DistributionManagerImpl dmi = new DistributionManagerImpl();
+
+   public void testDeterminingLeaversAndJoiners() {
+      Address a1 = EasyMock.createNiceMock(Address.class);
+      Address a2 = EasyMock.createNiceMock(Address.class);
+      Address a3 = EasyMock.createNiceMock(Address.class);
+      Address a4 = EasyMock.createNiceMock(Address.class);
+      Address a5 = EasyMock.createNiceMock(Address.class);
+
+      Address newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4),
+                                    Arrays.asList(a1, a2, a3, a4, a5));
+
+      assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+      newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4),
+                            Arrays.asList(a5, a4, a3, a2, a1));
+
+      assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+      newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4, a5),
+                            Arrays.asList(a1, a2, a3, a4));
+
+      assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+      newAddress = dmi.diff(Arrays.asList(a5, a4, a3, a2, a1),
+                            Arrays.asList(a1, a2, a3, a4));
+
+      assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Deleted: trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java	2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -1,180 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
- at Test(testName = "distribution.RehashJoinTest", groups = "functional")
-public class RehashJoinTest extends BaseDistFunctionalTest {
-
-   Log log = LogFactory.getLog(RehashJoinTest.class);
-   CacheManager joinerManager;
-
-   public RehashJoinTest() {
-      cleanup = CleanupPhase.AFTER_METHOD;
-   }
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      super.createCacheManagers();
-      joinerManager = addClusterEnabledCacheManager();
-      joinerManager.defineConfiguration(cacheName, configuration);
-   }
-
-   public void testRehashOnJoin() {
-      MagicKey k1 = new MagicKey(c1, "k1");
-      MagicKey k2 = new MagicKey(c2, "k2");
-      MagicKey k3 = new MagicKey(c3, "k3");
-      MagicKey k4 = new MagicKey(c4, "k4");
-
-      List<MagicKey> keys = new ArrayList<MagicKey>(4);
-      keys.add(k1);
-      keys.add(k2);
-      keys.add(k3);
-      keys.add(k4);
-
-      int i = 0;
-      for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
-
-      i = 0;
-      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
-      log.info("***>>> Firing up new joiner!");
-
-      // now fire up a new joiner
-      Cache<Object, String> joiner = joinerManager.getCache(cacheName);
-
-      // need to wait for the joiner to, well, join.
-      TestingUtil.blockUntilViewsReceived(SECONDS.toMillis(480), cacheManagers.toArray(new CacheManager[cacheManagers.size()]));
-
-      // need to block until this join has completed!
-      waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
-
-      // where does the joiner sit in relation to the other caches?
-      int joinerPos = locateJoiner(joinerManager.getAddress());
-
-      log.info("***>>> Joiner is in position " + joinerPos);
-
-      caches.add(joinerPos, joiner);
-      i = 0;
-      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
-      assertProperConsistentHashOnAllCaches();
-   }
-
-   public void testMultipleJoiners() throws InterruptedException {
-
-      // have all JOIN phases completed?
-      for (Cache c : caches) {
-         DistributionManagerImpl dmi = (DistributionManagerImpl) getDistributionManager(c);
-         assert !dmi.rehashInProgress : "Cache " + addressOf(c) + " still has rehashInProgress=true!";
-         assert dmi.rehashQueue.isEmpty() : "Cache " + addressOf(c) + " still has queued RehashTasks!";
-         assert !(dmi.getConsistentHash() instanceof UnionConsistentHash) : "Cache " + addressOf(c) + " still using a UnionConsistentHash!";
-      }
-
-      MagicKey k1 = new MagicKey(c1, "k1");
-      MagicKey k2 = new MagicKey(c2, "k2");
-      MagicKey k3 = new MagicKey(c3, "k3");
-      MagicKey k4 = new MagicKey(c4, "k4");
-
-      List<MagicKey> keys = new ArrayList<MagicKey>(4);
-      keys.add(k1);
-      keys.add(k2);
-      keys.add(k3);
-      keys.add(k4);
-
-      int i = 0;
-      for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
-
-      i = 0;
-      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
-      int numNewJoiners = 3; // in addition to the 1 joiner defined in createCacheManagers()!
-      int numJoiners = numNewJoiners + 1;
-      final CacheManager[] joinerManagers = new CacheManager[numJoiners];
-      final Cache[] joiners = new Cache[numJoiners];
-      joinerManagers[0] = joinerManager;
-
-      for (i = 1; i < numJoiners; i++) {
-         joinerManagers[i] = addClusterEnabledCacheManager();
-         joinerManagers[i].defineConfiguration(cacheName, configuration);
-      }
-
-      log.info("***>>> Firing up {0} new joiners!", numJoiners);
-
-      // now fire up a new joiners, in separate threads.
-      final CountDownLatch joinLatch = new CountDownLatch(1);
-      Thread[] threads = new Thread[numJoiners];
-
-      for (i = 0; i < numJoiners; i++) {
-         final int idx = i;
-         threads[idx] = new Thread() {
-            public void run() {
-               try {
-                  joinLatch.await();
-               } catch (InterruptedException e) {
-                  log.error(e);
-               }
-               joiners[idx] = joinerManagers[idx].getCache(cacheName);
-            }
-         };
-         threads[idx].start();
-      }
-
-      joinLatch.countDown();
-
-      for (Thread t : threads) t.join();
-
-      CacheManager[] cacheManagerArray = cacheManagers.toArray(new CacheManager[cacheManagers.size()]);
-      log.info("Number of cache manager views to wait for: {0}", cacheManagerArray.length);
-
-      // need to wait for the joiner to, well, join.
-      TestingUtil.blockUntilViewsReceived(SECONDS.toMillis(480), cacheManagerArray);
-      // where do the joiners sit in relation to the other caches?
-
-
-      waitForJoinTasksToComplete(SECONDS.toMillis(480), joiners);
-
-      // need to wait a *short while* before we attempt to locate joiners, since post-join invalidation messages are sent async.
-      // TODO replace this with some form of command detection on remote nodes.
-      // join tasks happen sequentially as well, so this needs some time to finish
-      TestingUtil.sleepThread(SECONDS.toMillis(2));
-
-      int[] joinersPos = new int[numJoiners];
-      for (i = 0; i < numJoiners; i++) joinersPos[i] = locateJoiner(joinerManagers[i].getAddress());
-
-      log.info("***>>> Joiners are in positions " + Arrays.toString(joinersPos));
-      for (i = 0; i < numJoiners; i++) {
-         if (joinersPos[i] > caches.size())
-            caches.add(joiners[i]);
-         else
-            caches.add(joinersPos[i], joiners[i]);
-      }
-      i = 0;
-      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
-      assertProperConsistentHashOnAllCaches();
-   }
-
-   private int locateJoiner(Address joinerAddress) {
-      for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
-         DefaultConsistentHash dch = getDefaultConsistentHash(c, SECONDS.toMillis(480));
-         int i = 0;
-         for (Address a : dch.positions.values()) {
-            if (a.equals(joinerAddress)) return i;
-            i++;
-         }
-      }
-      throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
-   }
-}

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinAndCoordDeathTest", enabled = false)
+public class ConcurrentJoinAndCoordDeathTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinAndOtherSenderDeathTest", enabled = false)
+public class ConcurrentJoinAndOtherSenderDeathTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,48 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinTest")
+public class ConcurrentJoinTest extends RehashTestBase {
+
+   List<CacheManager> joinerManagers;
+   List<Cache<Object, String>> joiners;
+
+   final int numJoiners = 4;
+
+   void performRehashEvent() {
+      joinerManagers = new ArrayList<CacheManager>(numJoiners);
+      joiners = new ArrayList<Cache<Object, String>>(numJoiners);
+      for (int i = 0; i < numJoiners; i++) {
+         CacheManager joinerManager = addClusterEnabledCacheManager();
+         joinerManager.defineConfiguration(cacheName, configuration);
+         Cache<Object, String> joiner = joinerManager.getCache(cacheName);
+         joinerManagers.add(joinerManager);
+         joiners.add(joiner);
+      }
+   }
+
+   @SuppressWarnings("unchecked")
+   void waitForRehashCompletion() {
+      waitForJoinTasksToComplete(SECONDS.toMillis(480), joiners.toArray(new Cache[numJoiners]));
+      TestingUtil.sleepThread(SECONDS.toMillis(2));
+      int[] joinersPos = new int[numJoiners];
+      for (int i = 0; i < numJoiners; i++) joinersPos[i] = locateJoiner(joinerManagers.get(i).getAddress());
+
+      log.info("***>>> Joiners are in positions " + Arrays.toString(joinersPos));
+      for (int i = 0; i < numJoiners; i++) {
+         if (joinersPos[i] > caches.size())
+            caches.add(joiners.get(i));
+         else
+            caches.add(joinersPos[i], joiners.get(i));
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentLeaveAndJoinNonOverlappingTest", enabled = false)
+public class ConcurrentLeaveAndJoinNonOverlappingTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}
\ No newline at end of file


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentLeaveAndJoinOverlappingTest", enabled = false)
+public class ConcurrentLeaveAndJoinOverlappingTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,26 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentNonOverlappingLeaveTest")
+public class ConcurrentNonOverlappingLeaveTest extends RehashLeaveTestBase {
+   Address l1, l2;
+
+   void performRehashEvent() {
+      l1 = addressOf(c2);
+      l2 = addressOf(c4);
+
+      CacheManager cm2 = c2.getCacheManager();
+      CacheManager cm4 = c4.getCacheManager();
+
+      cacheManagers.removeAll(Arrays.asList(cm2, cm4));
+      caches.removeAll(Arrays.asList(c2, c4));
+
+      TestingUtil.killCacheManagers(cm2, cm4);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,26 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentOverlappingLeaveTest")
+public class ConcurrentOverlappingLeaveTest extends RehashLeaveTestBase {
+   Address l1, l2;
+
+   void performRehashEvent() {
+      l1 = addressOf(c3);
+      l2 = addressOf(c4);
+
+      CacheManager cm3 = c3.getCacheManager();
+      CacheManager cm4 = c4.getCacheManager();
+
+      cacheManagers.removeAll(Arrays.asList(cm3, cm4));
+      caches.removeAll(Arrays.asList(c3, c4));
+
+      TestingUtil.killCacheManagers(cm3, cm4);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.JoinAndCoordDeathTest", enabled = false)
+public class JoinAndCoordDeathTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.JoinAndOtherSenderDeathTest", enabled = false)
+public class JoinAndOtherSenderDeathTest {
+   public void implementMe() {
+      assert false : "Implement me!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,31 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+ at Test(groups = "functional", testName = "distribution.rehash.RehashLeaveTestBase")
+public abstract class RehashLeaveTestBase extends RehashTestBase {
+   void waitForRehashCompletion() {
+      long giveupTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60 * 5);
+
+
+      for (Cache c : caches) {
+         DistributionManager distributionManager = getDistributionManager(c);
+
+         while (distributionManager.isRehashInProgress() && System.currentTimeMillis() < giveupTime) {
+            TestingUtil.sleepThread(250);
+         }
+
+         if (distributionManager.isRehashInProgress())
+            throw new RuntimeException("Timed out waiting for rehash to complete on cache " + addressOf(c));
+      }
+
+      // ensure transaction logging is disabled everywhere
+      for (Cache c : caches)
+         assert !getDistributionManager(c).getTransactionLogger().isEnabled() : "Transaction logging for cache " + addressOf(c) + " is still enabled!!";
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,224 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.DefaultConsistentHash;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A base test for all rehashing tests
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.RehashTestBase")
+public abstract class RehashTestBase extends BaseDistFunctionalTest {
+
+   protected RehashTestBase() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+      tx = true;
+   }
+
+   // this setup has 4 running caches: {c1, c2, c3, c4}
+
+   /**
+    * This is overridden by subclasses.  Could typically be a JOIN or LEAVE event.
+    */
+   abstract void performRehashEvent();
+
+   /**
+    * Blocks until a rehash completes.
+    */
+   abstract void waitForRehashCompletion();
+
+   protected int locateJoiner(Address joinerAddress) {
+      for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
+         DefaultConsistentHash dch = getDefaultConsistentHash(c, SECONDS.toMillis(480));
+         int i = 0;
+         for (Address a : dch.getCaches()) {
+            if (a.equals(joinerAddress)) return i;
+            i++;
+         }
+      }
+      throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
+   }
+
+
+   private List<MagicKey> init() {
+      List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
+            new MagicKey(c1, "k1"), new MagicKey(c2, "k2"),
+            new MagicKey(c3, "k3"), new MagicKey(c4, "k4")
+      ));
+
+      int i = 0;
+      for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
+
+      i = 0;
+      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+      log.info("Initialized with keys {0}", keys);
+      return keys;
+   }
+
+   /**
+    * Simple test.  Put some state, trigger event, test results
+    */
+   public void testNonTransactional() {
+      List<MagicKey> keys = init();
+
+      log.info("Invoking rehash event");
+      performRehashEvent();
+
+      waitForRehashCompletion();
+      log.info("Rehash complete");
+
+      int i = 0;
+      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+      assertProperConsistentHashOnAllCaches();
+   }
+
+
+   /**
+    * More complex - init some state.  Start a new transaction, and midway trigger a rehash.  Then complete transaction
+    * and test results.
+    */
+   @Test(enabled = false, description = "Enable after releasing Beta1")
+   public void testTransactional() throws Exception {
+      final List<MagicKey> keys = init();
+      final CountDownLatch l = new CountDownLatch(1);
+
+      Thread th = new Thread("Updater") {
+         @Override
+         public void run() {
+            try {
+               // start a transaction on c1.
+               TransactionManager t1 = TestingUtil.getTransactionManager(c1);
+               t1.begin();
+               c1.put(keys.get(0), "transactionally_replaced");
+               Transaction tx = t1.getTransaction();
+               tx.enlistResource(new XAResourceAdapter() {
+                  public int prepare(Xid id) {
+                     // this would be called *after* the cache prepares.
+                     try {
+                        l.await();
+                     } catch (InterruptedException e) {
+                     }
+                     return XAResource.XA_OK;
+                  }
+               });
+               t1.commit();
+            } catch (Exception e) {
+               throw new RuntimeException(e);
+            }
+         }
+      };
+
+      th.start();
+
+      log.info("Invoking rehash event");
+      performRehashEvent();
+      l.countDown();
+      th.join();
+
+      log.info("Rehash complete");
+
+      assertOnAllCachesAndOwnership(keys.get(0), "transactionally_replaced");
+      assertOnAllCachesAndOwnership(keys.get(1), "v" + 2);
+      assertOnAllCachesAndOwnership(keys.get(2), "v" + 3);
+      assertOnAllCachesAndOwnership(keys.get(3), "v" + 4);
+
+      assertProperConsistentHashOnAllCaches();
+   }
+
+   /**
+    * A stress test.  One node is constantly modified while a rehash occurs.
+    */
+   @Test(enabled = false, description = "Enable after releasing Beta1")
+   public void testNonTransactionalStress() throws Exception {
+      stressTest(false);
+   }
+
+   /**
+    * A stress test.  One node is constantly modified using transactions while a rehash occurs.
+    */
+   @Test(enabled = false, description = "Enable after releasing Beta1")
+   public void testTransactionalStress() throws Exception {
+      stressTest(true);
+   }
+
+   private void stressTest(boolean tx) throws Exception {
+      final List<MagicKey> keys = init();
+      final CountDownLatch latch = new CountDownLatch(1);
+      List<Updater> updaters = new ArrayList<Updater>(keys.size());
+      for (MagicKey k : keys) {
+         Updater u = new Updater(c1, k, latch, tx);
+         u.start();
+         updaters.add(u);
+      }
+
+      latch.countDown();
+
+      log.info("Invoking rehash event");
+      performRehashEvent();
+
+      for (Updater u : updaters) u.complete();
+      for (Updater u : updaters) u.join();
+
+      waitForRehashCompletion();
+
+      log.info("Rehash complete");
+
+      int i = 0;
+      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + updaters.get(i++).currentValue);
+
+      assertProperConsistentHashOnAllCaches();
+   }
+}
+
+class Updater extends Thread {
+   static final Random r = new Random();
+   volatile int currentValue = 0;
+   BaseDistFunctionalTest.MagicKey key;
+   Cache cache;
+   CountDownLatch latch;
+   volatile boolean running = true;
+   TransactionManager tm;
+
+   Updater(Cache cache, BaseDistFunctionalTest.MagicKey key, CountDownLatch latch, boolean tx) {
+      super("Updater-" + key);
+      this.key = key;
+      this.cache = cache;
+      this.latch = latch;
+      if (tx) tm = TestingUtil.getTransactionManager(cache);
+   }
+
+   public void complete() {
+      running = false;
+   }
+
+   @Override
+   public void run() {
+      while (running) {
+         try {
+            currentValue++;
+            if (tm != null) tm.begin();
+            cache.put(key, "v" + currentValue);
+            if (tm != null) tm.commit();
+            TestingUtil.sleepThread(r.nextInt(10) * 10);
+         } catch (Exception e) {
+            // do nothing?
+         }
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,31 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.testng.annotations.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+ at Test(groups = "functional", testName = "distribution.rehash.SingleJoinTest")
+public class SingleJoinTest extends RehashTestBase {
+   CacheManager joinerManager;
+   Cache<Object, String> joiner;
+
+   void performRehashEvent() {
+      joinerManager = addClusterEnabledCacheManager();
+      joinerManager.defineConfiguration(cacheName, configuration);
+      joiner = joinerManager.getCache(cacheName);
+   }
+
+   void waitForRehashCompletion() {
+      // need to block until this join has completed!
+      waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
+
+      // where does the joiner sit in relation to the other caches?
+      int joinerPos = locateJoiner(joinerManager.getAddress());
+
+      log.info("***>>> Joiner is in position " + joinerPos);
+
+      caches.add(joinerPos, joiner);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,20 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.SingleLeaveTest")
+public class SingleLeaveTest extends RehashLeaveTestBase {
+   Address leaverAddress;
+
+   void performRehashEvent() {
+      // cause a node to LEAVE.  Typically this is c4.
+      leaverAddress = addressOf(c4);
+      CacheManager cm4 = c4.getCacheManager();
+      cacheManagers.remove(cm4);
+      caches.remove(c4);
+      TestingUtil.killCacheManagers(cm4);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java	2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,53 @@
+package org.infinispan.distribution.rehash;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * abstract class that needs to be overridden
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public abstract class XAResourceAdapter implements XAResource {
+   public void commit(Xid xid, boolean b) throws XAException {
+      // no-op
+   }
+
+   public void end(Xid xid, int i) throws XAException {
+      // no-op
+   }
+
+   public void forget(Xid xid) throws XAException {
+      // no-op
+   }
+
+   public int getTransactionTimeout() throws XAException {
+      return 0;
+   }
+
+   public boolean isSameRM(XAResource xaResource) throws XAException {
+      return false;
+   }
+
+   public int prepare(Xid xid) throws XAException {
+      return 0;
+   }
+
+   public Xid[] recover(int i) throws XAException {
+      return new Xid[0];
+   }
+
+   public void rollback(Xid xid) throws XAException {
+      // no-op
+   }
+
+   public boolean setTransactionTimeout(int i) throws XAException {
+      return false;
+   }
+
+   public void start(Xid xid, int i) throws XAException {
+      // no-op
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list