[infinispan-commits] Infinispan SVN: r1849 - in trunk/core/src/main/java/org/infinispan: commands/control and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue May 25 17:03:58 EDT 2010


Author: vblagojevic at jboss.com
Date: 2010-05-25 17:03:57 -0400 (Tue, 25 May 2010)
New Revision: 1849

Added:
   trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
Log:
[ISPN-420] - Refactoring: merge rehash process when joining and leaving into a single codebase

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -279,14 +279,20 @@
     */
    RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> state);
 
+
    /**
-    * A more generic version of this factory method that allows the setting of various fields.
-    *
-    * @param subtype type of RehashControlCommand
-    * @param sender sender's Address
-    * @param state state to push
-    * @param consistentHash consistent hash to deliver
-    * @return a RehashControlCommand
+    * Builds a RehashControlCommand for coordinating a rehash event. This particular variation of RehashControlCommand
+    * coordinates rehashing of nodes when a node join or leaves
+    * 
+    * @param subtype
+    * @param sender
+    * @param state
+    * @param oldCH
+    * @param leaversHandled
+    * @param newCH
+    * @return
     */
-   RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
+   RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype,
+            Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
+            ConsistentHash newCH, List<Address> leaversHandled);
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -290,11 +290,11 @@
    }
 
    public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender) {
-      return buildRehashControlCommand(type, sender, null, null);
+      return buildRehashControlCommand(type, sender, null, null, null, null);
    }
 
    public RehashControlCommand buildRehashControlCommand(Address sender, Map<Object, InternalCacheValue> state) {
-      return buildRehashControlCommand(PUSH_STATE, sender, state, null);
+      return buildRehashControlCommand(PUSH_STATE, sender, state, null, null,  null);
    }
 
    public RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> commands) {
@@ -303,10 +303,11 @@
 
    public RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> commands) {
       return new RehashControlCommand(cacheName, DRAIN_TX_PREPARES, sender, null, commands, this);
+   }  
+   
+   public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type,
+            Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
+            ConsistentHash newCH, List<Address> leavers) {
+      return new RehashControlCommand(cacheName, type, sender, state, oldCH, newCH, leavers, this);
    }
-
-   public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
-      return new RehashControlCommand(cacheName, type, sender, state, consistentHash, this);
-   }
-
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,13 +45,15 @@
    public static final int COMMAND_ID = 17;
 
    public enum Type {
-      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
+      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
    }
 
    Type type;
    Address sender;
    Map<Object, InternalCacheValue> state;
-   ConsistentHash consistentHash;
+   ConsistentHash oldCH;
+   List<Address> nodesLeft;
+   ConsistentHash newCH;
 
    // cache components
    DistributionManager distributionManager;
@@ -67,13 +69,15 @@
    }
 
 
-   public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,
-                               ConsistentHash consistentHash, CommandsFactory commandsFactory) {
+   public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,ConsistentHash oldConsistentHash,
+                                ConsistentHash consistentHash, List<Address> leavers, CommandsFactory commandsFactory) {
       super(cacheName);
       this.type = type;
       this.sender = sender;
       this.state = state;
-      this.consistentHash = consistentHash;
+      this.oldCH = oldConsistentHash;
+      this.newCH = consistentHash;
+      this.nodesLeft = leavers;
       this.commandsFactory = commandsFactory;
    }
 
@@ -119,8 +123,10 @@
          case JOIN_COMPLETE:
             distributionManager.notifyJoinComplete(sender);
             return null;
-         case PULL_STATE:
-            return pullState();
+         case PULL_STATE_JOIN:
+            return pullStateForJoin();             
+         case PULL_STATE_LEAVE:
+             return pullStateForLeave();    
          case PUSH_STATE:
             return pushState();
          case DRAIN_TX:
@@ -133,29 +139,68 @@
       throw new CacheException("Unknown rehash control command type " + type);
    }
 
-   public Map<Object, InternalCacheValue> pullState() throws CacheLoaderException {
-      Address self = transport.getAddress();
-      ConsistentHash oldCH = distributionManager.getConsistentHash();
-      int numCopies = configuration.getNumOwners();
-
+   public Map<Object, InternalCacheValue> pullStateForJoin() throws CacheLoaderException {           
+      
       Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
       for (InternalCacheEntry ice : dataContainer) {
          Object k = ice.getKey();
-         if (shouldAddToMap(k, oldCH, numCopies, self)) state.put(k, ice.toInternalCacheValue());
+         if (shouldTransferOwnershipToJoinNode(k)) {            
+             state.put(k, ice.toInternalCacheValue());
+         }
       }
 
       CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
       if (cacheStore != null) {
          for (Object k: cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
-            if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self)) {
-               InternalCacheValue v = loadValue(cacheStore, k);
+            if (!state.containsKey(k) && shouldTransferOwnershipToJoinNode(k)) {                
+               InternalCacheValue v = loadValue(cacheStore, k);               
                if (v != null) state.put(k, v);
             }
          }
       }
       return state;
    }
+   
+   public Map<Object, InternalCacheValue> pullStateForLeave() throws CacheLoaderException {
+     
+      Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
+      for (InternalCacheEntry ice : dataContainer) {
+         Object k = ice.getKey();
+         if (shouldTransferOwnershipFromLeftNodes(k)) {
+            state.put(k, ice.toInternalCacheValue());
+         }
+      }
 
+      CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
+      if (cacheStore != null) {
+         for (Object k : cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
+            if (!state.containsKey(k) && shouldTransferOwnershipFromLeftNodes(k)) {
+               InternalCacheValue v = loadValue(cacheStore, k);               
+               if (v != null)
+                  state.put(k, v);
+            }
+         }
+      }
+      return state;
+   }
+   
+   private boolean shouldTransferOwnershipFromLeftNodes(Object k) {      
+      Address self = transport.getAddress();      
+      int numCopies = configuration.getNumOwners();
+      
+      List<Address> oldList = oldCH.locate(k, numCopies);
+      boolean localToThisNode = oldList.indexOf(self) >= 0;
+      boolean senderIsNewOwner = newCH.isKeyLocalToAddress(sender, k, numCopies);
+      for (Address leftNodeAddress : nodesLeft) {
+         boolean localToLeftNode = oldList.indexOf(leftNodeAddress) >= 0;
+         if (localToLeftNode && senderIsNewOwner && localToThisNode) {
+            return true;
+         }
+      }
+      return false;
+   }
+      
+
    private InternalCacheValue loadValue(CacheStore cs, Object k) {
       try {
          InternalCacheEntry ice = cs.load(k);
@@ -166,16 +211,17 @@
       return null;
    }
 
-   final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
-      // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
-      // in new_ch, then add this to the map.
+   final boolean shouldTransferOwnershipToJoinNode(Object k) {     
+      Address self = transport.getAddress();      
+      int numCopies = configuration.getNumOwners(); 
       List<Address> oldOwnerList = oldCH.locate(k, numCopies);
       if (!oldOwnerList.isEmpty() && self.equals(oldOwnerList.get(0))) {
-         List<Address> newOwnerList = consistentHash.locate(k, numCopies);
+         List<Address> newOwnerList = newCH.locate(k, numCopies);
          if (newOwnerList.contains(sender)) return true;
       }
       return false;
    }
+   
 
    public Object pushState() {
       distributionManager.applyReceivedState(state);
@@ -187,7 +233,7 @@
    }
 
    public Object[] getParameters() {
-      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash, txLogCommands, pendingPrepares};
+      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares};
    }
 
    @SuppressWarnings("unchecked")
@@ -197,7 +243,9 @@
       type = Type.values()[(Byte) parameters[i++]];
       sender = (Address) parameters[i++];
       state = (Map<Object, InternalCacheValue>) parameters[i++];
-      consistentHash = (ConsistentHash) parameters[i++];
+      oldCH = (ConsistentHash) parameters[i++];
+      nodesLeft = (List<Address>) parameters[i++];
+      newCH = (ConsistentHash) parameters[i++];
       txLogCommands = (List<WriteCommand>) parameters[i++];
       pendingPrepares = (List<PrepareCommand>) parameters[i++];
    }
@@ -208,7 +256,9 @@
             "type=" + type +
             ", sender=" + sender +
             ", state=" + state +
-            ", consistentHash=" + consistentHash +
+            ", oldConsistentHash=" + oldCH +
+            ", nodesLeft=" + nodesLeft +
+            ", consistentHash=" + newCH +
             ", txLogCommands=" + txLogCommands +
             ", pendingPrepares=" + pendingPrepares +
             '}';

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -57,7 +57,6 @@
 import java.util.Set;
 import java.util.HashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -160,7 +159,7 @@
       consistentHash = createConsistentHash(configuration, members);
       self = t.getAddress();
       if (members.size() > 1 && !t.getCoordinator().equals(self)) {
-         JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
+         JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
          joinFuture = rehashExecutor.submit(joinTask);
       } else {
          joinComplete = true;
@@ -189,6 +188,8 @@
 
          boolean willReceiveLeaverState = willReceiveLeaverState(leaver);
          boolean willSendLeaverState = willSendLeaverState(leaver);
+         List<Address> stateProviders = holdersOfLeaversState(newMembers, leaver);
+        
          try {
             if (!(consistentHash instanceof UnionConsistentHash)) oldConsistentHash = consistentHash;
             else oldConsistentHash = ((UnionConsistentHash) consistentHash).newCH;
@@ -196,23 +197,21 @@
          } catch (Exception e) {
             log.fatal("Unable to process leaver!!", e);
             throw new CacheException(e);
-         }
+         }       
 
          if (willReceiveLeaverState || willSendLeaverState) {
-            log.info("Starting transaction logging!");
+            log.info("I {0} am participating in rehash", rpcManager.getTransport().getAddress());
             transactionLogger.enable();
-         }
 
-         if (willSendLeaverState) {
-            if (leaveTaskFuture != null && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
+            if (leaveTaskFuture != null
+                     && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
                leaveTaskFuture.cancel(true);
             }
 
             leavers.add(leaver);
-            LeaveTask task = new LeaveTask(this, rpcManager, configuration, leavers, transactionLogger, cf, dataContainer);
+            InvertedLeaveTask task = new InvertedLeaveTask(this, rpcManager, configuration, cf,
+                     dataContainer, leavers, stateProviders, willReceiveLeaverState);
             leaveTaskFuture = rehashExecutor.submit(task);
-
-            log.info("Need to rehash");
          } else {
             log.info("Not in same subspace, so ignoring leave event");
          }
@@ -223,6 +222,18 @@
       ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;
       return ch.isAdjacent(leaver, self);
    }
+   
+    List<Address> holdersOfLeaversState(List<Address> members, Address leaver) {
+        ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash: consistentHash;
+        Set<Address> holders = new HashSet<Address>();
+        for (Address address : members) {
+           
+            if (ch.isAdjacent(leaver, address)) {
+                holders.add(address);
+            }
+        }
+        return new ArrayList<Address>(holders);
+    }
 
    boolean willReceiveLeaverState(Address leaver) {
       ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;

Added: trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -0,0 +1,218 @@
+package org.infinispan.distribution;
+
+import static org.infinispan.commands.control.RehashControlCommand.Type.PULL_STATE_LEAVE;
+import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.control.RehashControlCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ *A task to handle rehashing for when a node leaves the cluster
+ * 
+ * @author Vladimir Blagojevic
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class InvertedLeaveTask extends RehashTask {
+
+   private static final Log log = LogFactory.getLog(InvertedLeaveTask.class);
+   private static final boolean trace = true;
+   private final List<Address> leavers;
+   private final Address self;
+   private final List<Address> leaversHandled;
+   private final List<Address> stateProviders;
+   private final boolean isReceiver;
+
+   public InvertedLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration conf,
+            CommandsFactory commandsFactory, DataContainer dataContainer, List<Address> leavers,
+            List<Address> stateProviders, boolean isReceiver) {
+      super(dmi, rpcManager, conf, commandsFactory, dataContainer);
+      this.leavers = leavers;
+      this.leaversHandled = new LinkedList<Address>(leavers);
+      this.stateProviders = stateProviders;
+      this.isReceiver = isReceiver;
+      this.self = rpcManager.getTransport().getAddress();
+   }
+
+   @SuppressWarnings("unchecked")
+   private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
+      return (Map<Object, InternalCacheValue>) r.getResponseValue();
+   }
+
+   protected void performRehash() throws Exception {
+      long start = System.currentTimeMillis();
+      boolean trace = log.isTraceEnabled();
+
+      int replCount = configuration.getNumOwners();
+      ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+      ConsistentHash newCH = dmi.getConsistentHash();
+      try {
+         if (log.isDebugEnabled()) {
+            if (isReceiver) {
+               log.debug("Commencing rehash at {0}, I am a state receiver", self);
+            } else {
+               log.debug("Commencing rehash at {0}, I am a state producer", self);
+            }
+         }
+         if (configuration.isRehashEnabled()) {
+            if (isReceiver) {
+               Address myAddress = rpcManager.getTransport().getAddress();
+               RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_LEAVE, myAddress,
+                        null, oldCH, newCH,leaversHandled);
+
+               List<Address> addressesWhoMaySendStuff = getStateProviderTargets();
+               log.debug("I {0} am pulling state from {1}", self, addressesWhoMaySendStuff);
+               List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd,
+                        SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
+
+               log.debug("I {0} received response {1} ", self, resps);
+               for (Response r : resps) {
+                  if (r instanceof SuccessfulResponse) {
+                     Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
+                     log.debug("I {0} am applying state {1} ", self, state);
+                     dmi.applyState(newCH, state);
+                  }
+               }
+            }
+            processAndDrainTxLog(oldCH, newCH, replCount);
+            invalidateInvalidHolders(leaversHandled, oldCH, newCH);
+         } else {
+            if (trace)
+               log.trace("Rehash not enabled, so not pulling state.");
+         }
+         if (trace)
+            log.info("{0} completed leave rehash in {1}!", self, Util.prettyPrintTime(System.currentTimeMillis()
+                     - start));
+      } catch (Exception e) {        
+         throw new CacheException("Unexpected exception", e);
+      } finally {
+         leavers.removeAll(leaversHandled);
+      }
+   }
+
+   private List<Address> getStateProviderTargets() {
+      return stateProviders;
+   }
+
+   private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      if (trace)
+         log.trace("Processing transaction log iteratively");
+
+      List<WriteCommand> c;
+      int i = 0;
+      TransactionLogger transactionLogger = dmi.getTransactionLogger();
+      while (transactionLogger.shouldDrainWithoutLock()) {
+         if (trace)
+            log.trace("Processing transaction log, iteration {0}", i++);
+         c = transactionLogger.drain();
+         if (trace)
+            log.trace("Found {0} modifications", c.size());
+         apply(oldCH, newCH, replCount, c);
+      }
+
+      if (trace)
+         log.trace("Processing transaction log: final drain and lock");
+      c = transactionLogger.drainAndLock();
+      if (trace)
+         log.trace("Found {0} modifications", c.size());
+      apply(oldCH, newCH, replCount, c);
+
+      if (trace)
+         log.trace("Handling pending prepares");
+      PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
+      Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
+      if (trace)
+         log.trace("Found {0} pending prepares", pendingPrepares.size());
+      for (PrepareCommand pc : pendingPrepares)
+         state.addState(pc);
+
+      if (trace)
+         log.trace("State map for pending prepares is {0}", state.getState());
+
+      Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+      for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
+         if (log.isDebugEnabled())
+            log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
+         RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
+         NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+         pushFutures.add(f);
+         rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+      }
+
+      for (Future f : pushFutures) {
+         try {
+            f.get();
+         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         } catch (ExecutionException e) {
+            log.error("Error pushing tx log", e);
+         }
+      }
+      if (trace)
+         log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
+
+      transactionLogger.unlockAndDisable();
+   }
+
+   private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount,
+            List<WriteCommand> wc) {
+      // need to create another "state map"
+      TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
+      for (WriteCommand c : wc)
+         state.addState(c);
+
+      if (trace)
+         log.trace("State map for modifications is {0}", state.getState());
+
+      Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+      for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
+         if (log.isDebugEnabled())
+            log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
+         RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
+         NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+         pushFutures.add(f);
+         rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f,
+                  configuration.getRehashRpcTimeout());
+      }
+
+      for (Future f : pushFutures) {
+         try {
+            f.get();
+         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         } catch (ExecutionException e) {
+            log.error("Error pushing tx log", e);
+         }
+      }
+   }
+
+   @Override
+   protected Log getLog() {
+      return log;
+   }
+}

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,15 +45,13 @@
 public class JoinTask extends RehashTask {
 
    private static final Log log = LogFactory.getLog(JoinTask.class);
-   ConsistentHash chOld;
-   ConsistentHash chNew;
-   Address self;
+   private final Address self;
 
    public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
-                   TransactionLogger transactionLogger, DataContainer dataContainer, DistributionManagerImpl dmi) {
-      super(dmi, rpcManager, conf, transactionLogger, commandsFactory, dataContainer);
+            DataContainer dataContainer, DistributionManagerImpl dmi) {
+      super(dmi, rpcManager, conf, commandsFactory, dataContainer);
       this.dataContainer = dataContainer;
-      self = rpcManager.getTransport().getAddress();
+      this.self = rpcManager.getTransport().getAddress();
    }
 
    @SuppressWarnings("unchecked")
@@ -75,45 +73,15 @@
       long start = System.currentTimeMillis();
       boolean trace = log.isTraceEnabled();
       if (log.isDebugEnabled()) log.debug("Commencing");
+      TransactionLogger transactionLogger = dmi.getTransactionLogger();
       boolean unlocked = false;
+      ConsistentHash chOld;
+      ConsistentHash chNew;
       try {
          dmi.joinComplete = false;
-         // 1.  Get chOld from coord.
-         // this happens in a loop to ensure we receive the correct CH and not a "union".
-         // TODO make at least *some* of these configurable!
-         long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
-         int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
-         Random rand = new Random();
-         long giveupTime = System.currentTimeMillis() + maxWaitTime;
-         do {
-            if (trace) log.trace("Requesting old consistent hash from coordinator");
-            List<Response> resp;
-            List<Address> addresses;
-            try {
-               resp = rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_REQ, self),
-                                                            SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
-               addresses = parseResponses(resp);
-               if (log.isDebugEnabled()) log.debug("Retrieved old consistent hash address list {0}", addresses);
-            } catch (TimeoutException te) {
-               // timed out waiting for responses; retry!
-               resp = null;
-               addresses = null;
-               if (log.isDebugEnabled()) log.debug("Timed out waiting for responses.");
-            }
+         // 1.  Get chOld from coord.         
+         chOld = retrieveOldCH(trace);
 
-            if (addresses == null) {
-               long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
-               time = (time * 10) + minSleepTime;
-               if (trace) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
-               Thread.sleep(time); // sleep for a while and retry
-            } else {
-               chOld = createConsistentHash(configuration, addresses);
-            }
-         } while (chOld == null && System.currentTimeMillis() < giveupTime);
-
-         if (chOld == null)
-            throw new CacheException("Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
-
          // 2.  new CH instance
          if (chOld.getCaches().contains(self))
             chNew = chOld;
@@ -133,9 +101,10 @@
 
             // 6.  pull state from everyone.
             Address myAddress = rpcManager.getTransport().getAddress();
-            RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
+            
+            RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
             // TODO I should be able to process state chunks from different nodes simultaneously!!
-            List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(configuration.getNumOwners());
+            List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
             List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
 
             // 7.  Apply state
@@ -177,6 +146,53 @@
       }
    }
 
+    private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
+                    InstantiationException, ClassNotFoundException {
+        
+        // this happens in a loop to ensure we receive the correct CH and not a "union".
+        // TODO make at least *some* of these configurable!
+        ConsistentHash result = null;
+        long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
+        int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
+        Random rand = new Random();
+        long giveupTime = System.currentTimeMillis() + maxWaitTime;
+        do {
+            if (trace)
+                log.trace("Requesting old consistent hash from coordinator");
+            List<Response> resp;
+            List<Address> addresses;
+            try {
+                resp = rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(
+                                JOIN_REQ, self), SYNCHRONOUS, configuration.getRehashRpcTimeout(),
+                                true);
+                addresses = parseResponses(resp);
+                if (log.isDebugEnabled())
+                    log.debug("Retrieved old consistent hash address list {0}", addresses);
+            } catch (TimeoutException te) {
+                // timed out waiting for responses; retry!
+                resp = null;
+                addresses = null;
+                if (log.isDebugEnabled())
+                    log.debug("Timed out waiting for responses.");
+            }
+
+            if (addresses == null) {
+                long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
+                time = (time * 10) + minSleepTime;
+                if (trace)
+                    log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
+                Thread.sleep(time); // sleep for a while and retry
+            } else {
+                result = createConsistentHash(configuration, addresses);
+            }
+        } while (result == null && System.currentTimeMillis() < giveupTime);
+
+        if (result == null)
+            throw new CacheException(
+                            "Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
+        return result;
+    }
+
    @Override
    protected Log getLog() {
       return log;
@@ -193,9 +209,9 @@
     * @param replCount
     * @return
     */
-   List<Address> getAddressesWhoMaySendStuff(int replCount) {
+   List<Address> getAddressesWhoMaySendStuff(ConsistentHash ch, int replCount) {
       List<Address> l = new LinkedList<Address>();
-      List<Address> caches = chNew.getCaches();
+      List<Address> caches = ch.getCaches();
       int selfIdx = caches.indexOf(self);
       if (selfIdx >= replCount - 1) {
          l.addAll(caches.subList(selfIdx - replCount + 1, selfIdx));

Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,9 +45,10 @@
    private final List<Address> leaversHandled;
 
 
-   protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration, List<Address> leavers,
-                       TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
-      super(dmi, rpcManager, configuration, transactionLogger, cf, dataContainer);
+   protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager,
+            Configuration configuration, List<Address> leavers, CommandsFactory cf,
+            DataContainer dataContainer) {
+      super(dmi, rpcManager, configuration, cf, dataContainer);
       this.leavers = leavers;
       this.leaversHandled = new LinkedList<Address>(leavers);
       this.self = rpcManager.getTransport().getAddress();
@@ -115,6 +116,7 @@
 
       List<WriteCommand> c;
       int i = 0;
+      TransactionLogger transactionLogger = dmi.getTransactionLogger();
       while (transactionLogger.shouldDrainWithoutLock()) {
          if (trace) log.trace("Processing transaction log, iteration {0}", i++);
          c = transactionLogger.drain();

Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2010-05-25 21:03:57 UTC (rev 1849)
@@ -33,17 +33,15 @@
 
    DistributionManagerImpl dmi;
    RpcManager rpcManager;
-   Configuration configuration;
-   TransactionLogger transactionLogger;
+   Configuration configuration;   
    CommandsFactory cf;
    DataContainer dataContainer;
 
-   protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration,
-                        TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
+   protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager,
+            Configuration configuration, CommandsFactory cf, DataContainer dataContainer) {
       this.dmi = dmi;
       this.rpcManager = rpcManager;
       this.configuration = configuration;
-      this.transactionLogger = transactionLogger;
       this.cf = cf;
       this.dataContainer = dataContainer;
    }
@@ -66,7 +64,7 @@
       return Collections.singleton(rpcManager.getTransport().getCoordinator());
    }
 
-   protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+   protected void invalidateInvalidHolders(List<Address> doNotInvalidate, ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
       if (getLog().isDebugEnabled()) getLog().debug("Invalidating entries that have migrated across");
       Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
       for (Object key : dataContainer.keySet()) {
@@ -81,6 +79,7 @@
          }
       }
 
+      invalidations.keySet().removeAll(doNotInvalidate);
       Set<Future> futures = new HashSet<Future>();
 
       for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
@@ -92,6 +91,10 @@
 
       for (Future f : futures) f.get();
    }
+   protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+      List<Address> none = Collections.emptyList();
+      invalidateInvalidHolders(none, chOld, chNew);
+   }
 
    protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
       List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());



More information about the infinispan-commits mailing list