[infinispan-commits] Infinispan SVN: r1137 - in trunk/core/src: main/java/org/infinispan/commands/control and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Nov 11 12:48:59 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-11-11 12:48:58 -0500 (Wed, 11 Nov 2009)
New Revision: 1137

Modified:
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
   trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
   trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
Log:
Updated to deal with the fact that a target recipient list may change between a prepare and a commit command being broadcast, and in-fly prepares would be pushed to new nodes as a part of a rehash.  Hence, the commits need to be redirected as well.

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -120,5 +120,9 @@
 
    RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state);
 
+   RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> state);
+
+   RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> state);
+
    RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -25,6 +25,8 @@
 import org.infinispan.commands.control.LockControlCommand;
 import org.infinispan.commands.control.RehashControlCommand;
 import org.infinispan.commands.control.StateTransferControlCommand;
+import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX_PREPARES;
+import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX;
 import org.infinispan.commands.read.EntrySetCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
 import org.infinispan.commands.read.KeySetCommand;
@@ -275,7 +277,7 @@
             break;
          case RehashControlCommand.COMMAND_ID:
             RehashControlCommand rcc = (RehashControlCommand) c;
-            rcc.init(distributionManager, configuration, dataContainer);
+            rcc.init(distributionManager, configuration, dataContainer, this);
             break;
          default:
             if (trace)
@@ -295,8 +297,16 @@
       return buildRehashControlCommand(type, sender, state, null);
    }
 
+   public RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> commands) {
+      return new RehashControlCommand(cacheName, DRAIN_TX, sender, commands, null, this);
+   }
+
+   public RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> commands) {
+      return new RehashControlCommand(cacheName, DRAIN_TX_PREPARES, sender, null, commands, this);
+   }
+
    public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
-      return new RehashControlCommand(cacheName, type, sender, state, consistentHash);
+      return new RehashControlCommand(cacheName, type, sender, state, consistentHash, this);
    }
 
 }

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,7 +1,11 @@
 package org.infinispan.commands.control;
 
 import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
 import org.infinispan.commands.remote.BaseRpcCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -20,6 +24,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Arrays;
 
 /**
  * A control command to coordinate rehashes that may occur when nodes join or leave a cluster, when DIST is used as a
@@ -34,7 +39,7 @@
    public static final int COMMAND_ID = 17;
 
    public enum Type {
-      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE
+      JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
    }
 
    Type type;
@@ -47,26 +52,51 @@
    Transport transport;
    Configuration configuration;
    DataContainer dataContainer;
+   List<WriteCommand> txLogCommands;
+   List<PrepareCommand> pendingPrepares;
+   CommandsFactory commandsFactory;
 
-   public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+   public RehashControlCommand() {
+   }
+
+
+   public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,
+                               ConsistentHash consistentHash, CommandsFactory commandsFactory) {
       super(cacheName);
       this.type = type;
       this.sender = sender;
       this.state = state;
       this.consistentHash = consistentHash;
+      this.commandsFactory = commandsFactory;
    }
 
-   public RehashControlCommand() {
+   public RehashControlCommand(String cacheName, Type type, Address sender, List<WriteCommand> txLogCommands,
+                               List<PrepareCommand> pendingPrepares, CommandsFactory commandsFactory) {
+      super(cacheName);
+      this.type = type;
+      this.sender = sender;
+      this.txLogCommands = txLogCommands;
+      this.pendingPrepares = pendingPrepares;
+      this.commandsFactory = commandsFactory;
    }
 
    public RehashControlCommand(Transport transport) {
       this.transport = transport;
    }
 
-   public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer) {
+   public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer,
+                    CommandsFactory commandsFactory) {
       this.distributionManager = distributionManager;
       this.configuration = configuration;
       this.dataContainer = dataContainer;
+      this.commandsFactory = commandsFactory;
+
+      // we need to "fix" these command lists - essentially propagate the init.  TODO think of a nicer way to do this!!
+      for (List<? extends ReplicableCommand> commandList : Arrays.asList(txLogCommands, pendingPrepares)) {
+         if (commandList != null) {
+            for (ReplicableCommand cmd : commandList) commandsFactory.initializeReplicableCommand(cmd);
+         }
+      }
    }
 
    public Object perform(InvocationContext ctx) throws Throwable {
@@ -86,6 +116,12 @@
             return pullState();
          case PUSH_STATE:
             return pushState();
+         case DRAIN_TX:
+            distributionManager.applyRemoteTxLog(txLogCommands);
+            return null;
+         case DRAIN_TX_PREPARES:
+            for (PrepareCommand pc : pendingPrepares) pc.perform(null);
+            return null;
       }
       throw new CacheException("Unknown rehash control command type " + type);
    }
@@ -133,7 +169,7 @@
    }
 
    public Object[] getParameters() {
-      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash};
+      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash, txLogCommands, pendingPrepares};
    }
 
    @SuppressWarnings("unchecked")
@@ -144,6 +180,8 @@
       sender = (Address) parameters[i++];
       state = (Map<Object, InternalCacheValue>) parameters[i++];
       consistentHash = (ConsistentHash) parameters[i++];
+      txLogCommands = (List<WriteCommand>) parameters[i++];
+      pendingPrepares = (List<PrepareCommand>) parameters[i++];
    }
 
    @Override
@@ -153,6 +191,8 @@
             ", sender=" + sender +
             ", state=" + state +
             ", consistentHash=" + consistentHash +
+            ", txLogCommands=" + txLogCommands +
+            ", pendingPrepares=" + pendingPrepares +
             '}';
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,14 +1,13 @@
 package org.infinispan.context.impl;
 
 import org.infinispan.CacheException;
-import org.infinispan.remoting.transport.Address;
 
 import javax.transaction.Status;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -19,25 +18,25 @@
  */
 public abstract class AbstractTxInvocationContext extends AbstractInvocationContext implements TxInvocationContext {
 
-   protected Set<Address> txParticipants = null;
+   protected Set<Object> affectedKeys = null;
 
    public boolean hasModifications() {
       return getModifications() != null && !getModifications().isEmpty();
    }
 
-   public Set<Address> getTransactionParticipants() {
-      return txParticipants == null ? Collections.EMPTY_SET : txParticipants;
+   public Set<Object> getAffectedKeys() {
+      return affectedKeys == null ? Collections.EMPTY_SET : affectedKeys;
    }
 
    public boolean isValidRunningTx() {
       return isValid(getRunningTransaction());
    }
 
-   public void addTransactionParticipants(List<Address> addresses) {
-      if (txParticipants == null) {
-         txParticipants = new HashSet<Address>();
+   public void addAffectedKeys(Object... keys) {
+      if (affectedKeys == null) {
+         affectedKeys = new HashSet<Object>();
       }
-      txParticipants.addAll(addresses);
+      affectedKeys.addAll(Arrays.asList(keys));
    }
 
 
@@ -97,8 +96,8 @@
    @Override
    public AbstractTxInvocationContext clone() {
       AbstractTxInvocationContext dolly = (AbstractTxInvocationContext) super.clone();
-      if (this.txParticipants != null) {
-         dolly.txParticipants = new HashSet<Address>(txParticipants);
+      if (this.affectedKeys != null) {
+         dolly.affectedKeys = new HashSet<Object>(affectedKeys);
       }
       return dolly;
    }

Modified: trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -2,7 +2,6 @@
 
 import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.context.InvocationContext;
-import org.infinispan.remoting.transport.Address;
 import org.infinispan.transaction.xa.GlobalTransaction;
 
 import javax.transaction.Transaction;
@@ -23,10 +22,10 @@
    public boolean hasModifications();
 
    /**
-    * Returns the set of cluster participants (identified through {@link org.infinispan.remoting.transport.Address}
-    * objects) that participate within the transaction. Null indicates all cluster members.
+    * Returns the set of keys that are affected by this transaction.  Used to generate appropriate recipient groups
+    * for cluster-wide prepare and commit calls.
     */
-   Set<Address> getTransactionParticipants();
+   Set<Object> getAffectedKeys();
 
    /**
     * Returns the id of the transaction assoctiated  with the current call.
@@ -54,5 +53,5 @@
    /**
     * Registers a new participant with the transaction.
     */
-   void addTransactionParticipants(List<Address> addresses);
+   void addAffectedKeys(Object... keys);
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -7,10 +7,12 @@
 import org.infinispan.factories.scopes.Scopes;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.commands.write.WriteCommand;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A component that manages the distribution of elements across a cache cluster
@@ -108,5 +110,9 @@
    boolean isJoinComplete();
 
    void applyReceivedState(Map<Object, InternalCacheValue> state);
+   
+   List<Address> getAffectedNodes(Set<Object> affectedKeys);
+
+   void applyRemoteTxLog(List<WriteCommand> txLogCommands);
 }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -44,9 +44,12 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -178,8 +181,8 @@
             throw new CacheException(e);
          }
 
-         if (willReceiveLeaverState) {
-            log.info("Starting transaction logging; expecting state from someone!");
+         if (willReceiveLeaverState || willSendLeaverState) {
+            log.info("Starting transaction logging!");
             transactionLogger.enable();
          }
 
@@ -350,7 +353,7 @@
       applyState(consistentHash, state);
       boolean unlocked = false;
       try {
-         drainTransactionLog();
+         drainLocalTransactionLog();
          unlocked = true;
       } finally {
          if (!unlocked) transactionLogger.unlockAndDisable();
@@ -361,9 +364,9 @@
       return joinComplete;
    }
 
-   void drainTransactionLog() {
+   void drainLocalTransactionLog() {
       List<WriteCommand> c;
-      while (transactionLogger.size() > 10) {
+      while (transactionLogger.shouldDrainWithoutLock()) {
          c = transactionLogger.drain();
          apply(c);
       }
@@ -383,6 +386,18 @@
       }
    }
 
+   public List<Address> getAffectedNodes(Set<Object> affectedKeys) {
+      if (affectedKeys == null || affectedKeys.isEmpty()) return Collections.emptyList();
+
+      Set<Address> an = new HashSet<Address>();
+      for (List<Address> addresses : locateAll(affectedKeys).values()) an.addAll(addresses);
+      return new ArrayList<Address>(an);
+   }
+
+   public void applyRemoteTxLog(List<WriteCommand> txLogCommands) {
+      apply(txLogCommands);
+   }
+
    @ManagedOperation(description = "Tells you whether a given key is local to this instance of the cache.  Only works with String keys.")
    @Operation(displayName = "Is key local?")
    public boolean isLocatedLocally(@Parameter(name = "key", description = "Key to query") String key) {

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -127,7 +127,7 @@
             }
 
             // 8.  Drain logs
-            dmi.drainTransactionLog();
+            dmi.drainLocalTransactionLog();
          }
          unlocked = true;
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,7 +1,10 @@
 package org.infinispan.distribution;
 
 import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
 import org.infinispan.commands.control.RehashControlCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -23,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 /**
@@ -33,7 +37,7 @@
  */
 public class LeaveTask extends RehashTask {
    private static final Log log = LogFactory.getLog(LeaveTask.class);
-
+   private static final boolean trace = log.isTraceEnabled();
    private final List<Address> leavers;
    private final Address self;
    private final List<Address> leaversHandled;
@@ -54,8 +58,9 @@
       List<Address> leaversHandled = new LinkedList<Address>(leavers);
       ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
       int replCount = configuration.getNumOwners();
+
       try {
-         StateMap statemap = new StateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
+         InMemoryStateMap statemap = new InMemoryStateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
          if (log.isTraceEnabled()) log.trace("Examining state in data container");
          // need to constantly detect whether we are interrupted.  If so, abort accordingly.
          for (InternalCacheEntry ice : dataContainer) {
@@ -70,17 +75,19 @@
          }
 
          // push state.
-         Set<Future<Void>> pushFutures = new HashSet<Future<Void>>();
+         Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
          for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
             if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
             RehashControlCommand push = cf.buildRehashControlCommand(RehashControlCommand.Type.PUSH_STATE, self, entry.getValue());
-            NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+            NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
             pushFutures.add(f);
             rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
          }
 
          for (Future f : pushFutures) f.get();
 
+         processAndDrainTxLog(oldCH, dmi.getConsistentHash(), replCount);
+
          completedSuccessfully = true;
          invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
          if (log.isInfoEnabled())
@@ -96,6 +103,83 @@
       }
    }
 
+   private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      if (trace) log.trace("Processing transaction log iteratively");
+
+      List<WriteCommand> c;
+      int i = 0;
+      while (transactionLogger.shouldDrainWithoutLock()) {
+         if (trace) log.trace("Processing transaction log, iteration {0}", i++);
+         c = transactionLogger.drain();
+         if (trace) log.trace("Found {0} modifications", c.size());
+         apply(oldCH, newCH, replCount, c);
+      }
+
+      if (trace) log.trace("Processing transaction log: final drain and lock");
+      c = transactionLogger.drainAndLock();
+      if (trace) log.trace("Found {0} modifications", c.size());
+      apply(oldCH, newCH, replCount, c);
+
+      if (trace) log.trace("Handling pending prepares");
+      PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
+      Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
+      if (trace) log.trace("Found {0} pending prepares", pendingPrepares.size());
+      for (PrepareCommand pc : pendingPrepares) state.addState(pc);
+
+      if (trace) log.trace("State map for pending prepares is {0}", state.getState());
+
+      Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+      for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
+         if (log.isDebugEnabled())
+            log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
+         RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
+         NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+         pushFutures.add(f);
+         rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+      }
+
+      for (Future f : pushFutures) {
+         try {
+            f.get();
+         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         } catch (ExecutionException e) {
+            log.error("Error pushing tx log", e);
+         }
+      }
+      if (trace) log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
+
+      transactionLogger.unlockAndDisable();
+   }
+
+   private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount, List<WriteCommand> wc) {
+      // need to create another "state map"
+      TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
+      for (WriteCommand c : wc) state.addState(c);
+
+      if (trace) log.trace("State map for modifications is {0}", state.getState());
+
+      Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+      for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
+         if (log.isDebugEnabled())
+            log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
+         RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
+         NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+         pushFutures.add(f);
+         rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+      }
+
+      for (Future f : pushFutures) {
+         try {
+            f.get();
+         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         } catch (ExecutionException e) {
+            log.error("Error pushing tx log", e);
+         }
+      }
+   }
+
    @Override
    protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
       Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
@@ -104,31 +188,43 @@
    }
 }
 
-class StateMap {
+abstract class StateMap<S> {
    List<Address> leavers;
-   Address self;
    ConsistentHash oldCH, newCH;
    int replCount;
-   Set<Object> keysHandled = new HashSet<Object>();
-   Map<Address, Map<Object, InternalCacheValue>> state = new HashMap<Address, Map<Object, InternalCacheValue>>();
+   Map<Address, S> state = new HashMap<Address, S>();
 
-   StateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+   StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
       this.leavers = leavers;
-      this.self = self;
       this.oldCH = oldCH;
       this.newCH = newCH;
       this.replCount = replCount;
    }
 
+   Map<Address, S> getState() {
+      return state;
+   }
+}
+
+class InMemoryStateMap extends StateMap<Map<Object, InternalCacheValue>> {
+   Address self;
+   Set<Object> keysHandled = new HashSet<Object>();
+
+   InMemoryStateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      super(leavers, oldCH, newCH, replCount);
+      this.self = self;
+   }
+
    /**
     * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
     * owner list
     *
-    * @param ice
+    * @param payload an InternalCacheEntry to add to the state map
     */
-   void addState(InternalCacheEntry ice) {
+   void addState(InternalCacheEntry payload) {
+      Object key = payload.getKey();
       for (Address leaver : leavers) {
-         List<Address> owners = oldCH.locate(ice.getKey(), replCount);
+         List<Address> owners = oldCH.locate(key, replCount);
          int leaverIndex = owners.indexOf(leaver);
          if (leaverIndex > -1) {
             int numOwners = owners.size();
@@ -137,7 +233,7 @@
             if ((isLeaverLast && selfIndex == numOwners - 2) ||
                   (!isLeaverLast && selfIndex == leaverIndex + 1)) {
                // add to state map!
-               List<Address> newOwners = newCH.locate(ice.getKey(), replCount);
+               List<Address> newOwners = newCH.locate(key, replCount);
                newOwners.removeAll(owners);
                if (!newOwners.isEmpty()) {
                   for (Address no : newOwners) {
@@ -146,20 +242,92 @@
                         s = new HashMap<Object, InternalCacheValue>();
                         state.put(no, s);
                      }
-                     s.put(ice.getKey(), ice.toInternalCacheValue());
+                     s.put(key, payload.toInternalCacheValue());
                   }
                }
             }
          }
       }
-      keysHandled.add(ice.getKey());
+      keysHandled.add(key);
    }
 
-   Map<Address, Map<Object, InternalCacheValue>> getState() {
-      return state;
-   }
-
    boolean containsKey(Object key) {
       return keysHandled.contains(key);
    }
 }
+
+/**
+ * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
+ *
+ * @param <T> type of replicable command to aggregate
+ */
+abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
+   Set<Object> keysHandled = new HashSet<Object>();
+
+   CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      super(leavers, oldCH, newCH, replCount);
+   }
+
+   // if only Java had duck-typing!
+   abstract Set<Object> getAffectedKeys(T payload);
+
+   /**
+    * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+    * owner list
+    *
+    * @param payload payload to consider when adding to the aggregate state
+    */
+   void addState(T payload) {
+      for (Object key : getAffectedKeys(payload)) {
+         for (Address leaver : leavers) {
+            List<Address> owners = oldCH.locate(key, replCount);
+            int leaverIndex = owners.indexOf(leaver);
+            if (leaverIndex > -1) {
+               // add to state map!
+               List<Address> newOwners = newCH.locate(key, replCount);
+               newOwners.removeAll(owners);
+               if (!newOwners.isEmpty()) {
+                  for (Address no : newOwners) {
+                     List<T> s = state.get(no);
+                     if (s == null) {
+                        s = new LinkedList<T>();
+                        state.put(no, s);
+                     }
+                     s.add(payload);
+                  }
+               }
+            }
+         }
+      }
+   }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
+ * to nodes during a leave.
+ */
+class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
+   PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      super(leavers, oldCH, newCH, replCount);
+   }
+
+   @Override
+   Set<Object> getAffectedKeys(PrepareCommand payload) {
+      return payload.getAffectedKeys();
+   }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
+ * made while state was being transferred to nodes during a leave. 
+ */
+class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
+   TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+      super(leavers, oldCH, newCH, replCount);
+   }
+
+   @Override
+   Set<Object> getAffectedKeys(WriteCommand payload) {
+      return payload.getAffectedKeys();
+   }
+}
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -67,4 +67,19 @@
    int size();
 
    boolean isEnabled();
+
+   /**
+    * Tests whether the drain() method can be called without a lock.  This is usually true if there is a lot of stuff
+    * to drain.  After a certain threshold (once there are relatively few entries in the tx log) this will return false
+    * after which you should call drainAndLock() to clear the final parts of the log.
+    * @return true if drain() should be called, false if drainAndLock() should be called.
+    */
+   boolean shouldDrainWithoutLock();
+
+   /**
+    * Drains pending prepares.  Note that this should *only* be done after calling drainAndLock() to prevent race
+    * conditions
+    * @return a list of prepares pending commit or rollback
+    */
+   Collection<PrepareCommand> getPendingPrepares();
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -9,6 +9,7 @@
 import org.infinispan.util.logging.LogFactory;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -20,7 +21,7 @@
 
 /**
  * A transaction logger to log ongoing transactions in an efficient and thread-safe manner while a rehash is going on.
- *
+ * <p/>
  * Transaction logs can then be replayed after the state transferred during a rehash has been written.
  *
  * @author Manik Surtani
@@ -33,8 +34,11 @@
    final Map<GlobalTransaction, PrepareCommand> uncommittedPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
    private static final Log log = LogFactory.getLog(TransactionLoggerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
+   // the number of transactions after which we need to lock and drain
+   private static final int DRAIN_LOCK_THRESHOLD = 10;
 
    public void enable() {
+      log.trace("Starting transaction logging");
       enabled = true;
    }
 
@@ -53,6 +57,7 @@
    public void unlockAndDisable() {
       enabled = false;
       loggingLock.writeLock().unlock();
+      log.trace("Stopping transaction logging");
    }
 
    public boolean logIfNeeded(WriteCommand command) {
@@ -75,21 +80,21 @@
    }
 
    public void logIfNeeded(PrepareCommand command) {
-      if (enabled) {
-         loggingLock.readLock().lock();
-         try {
-            if (enabled) {
-               if (command.isOnePhaseCommit()) {
+      if (command.isOnePhaseCommit()) {
+         if (enabled) {
+            loggingLock.readLock().lock();
+            try {
+               if (enabled) {
                   if (trace) log.trace("Logging 1PC prepare for tx {0}", command.getGlobalTransaction());
                   logModificationsInTransaction(command);
-               } else {
-                  if (trace) log.trace("Logging 2PC prepare for tx {0}", command.getGlobalTransaction());
-                  uncommittedPrepares.put(command.getGlobalTransaction(), command);
                }
+            } finally {
+               loggingLock.readLock().unlock();
             }
-         } finally {
-            loggingLock.readLock().unlock();
          }
+      } else {
+         if (trace) log.trace("Logging 2PC prepare for tx {0}", command.getGlobalTransaction());
+         uncommittedPrepares.put(command.getGlobalTransaction(), command);
       }
    }
 
@@ -104,12 +109,12 @@
    }
 
    public void logIfNeeded(CommitCommand command) {
+      PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
       if (enabled) {
          loggingLock.readLock().lock();
          try {
             if (enabled) {
                if (trace) log.trace("Logging commit for tx {0}", command.getGlobalTransaction());
-               PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
                logModificationsInTransaction(pc);
             }
          } finally {
@@ -119,17 +124,8 @@
    }
 
    public void logIfNeeded(RollbackCommand command) {
-      if (enabled) {
-         loggingLock.readLock().lock();
-         try {
-            if (enabled) {
-               if (trace) log.trace("Logging rollback for tx {0}", command.getGlobalTransaction());
-               uncommittedPrepares.remove(command.getGlobalTransaction());
-            }
-         } finally {
-            loggingLock.readLock().unlock();
-         }
-      }
+      if (trace) log.trace("Logging rollback for tx {0}", command.getGlobalTransaction());
+      uncommittedPrepares.remove(command.getGlobalTransaction());
    }
 
    public boolean logIfNeeded(Collection<WriteCommand> commands) {
@@ -159,4 +155,14 @@
    public boolean isEnabled() {
       return enabled;
    }
+
+   public boolean shouldDrainWithoutLock() {
+      return size() > DRAIN_LOCK_THRESHOLD;
+   }
+
+   public Collection<PrepareCommand> getPendingPrepares() {
+      Collection<PrepareCommand> commands = new HashSet<PrepareCommand>(uncommittedPrepares.values());
+      uncommittedPrepares.clear();
+      return commands;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -8,6 +8,7 @@
 import org.infinispan.commands.write.ReplaceCommand;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.interceptors.base.CommandInterceptor;
@@ -19,7 +20,7 @@
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import java.util.Set;
+import java.util.HashSet;
 
 /**
  * This interceptor populates the {@link org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction} with
@@ -37,12 +38,15 @@
    private TransactionTable txTable;
    private LockManager lockManager;
    private TransactionManager txManager;
+   private DistributionManager distributionManager;
 
    @Inject
-   public void init(TransactionTable txTable, LockManager lockManager, TransactionManager txManager) {
+   public void init(TransactionTable txTable, LockManager lockManager, TransactionManager txManager,
+                    DistributionManager distributionManager) {
       this.txTable = txTable;
       this.lockManager = lockManager;
       this.txManager = txManager;
+      this.distributionManager = distributionManager;
    }
 
 
@@ -114,8 +118,7 @@
       globalTransaction.setProcessingThread(Thread.currentThread());
       if (ctx.isOriginLocal()) {
          if (configuration.getCacheMode().isDistributed()) {
-            Set<Address> transactionParticipants = ctx.getTransactionParticipants();
-            globalTransaction.setReplicatingTo(transactionParticipants);
+            globalTransaction.setReplicatingTo(new HashSet<Address>(distributionManager.getAffectedNodes(ctx.getAffectedKeys())));
          } else {
             globalTransaction.setReplicatingTo(null);
          }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -30,7 +30,6 @@
 import org.infinispan.util.concurrent.NotifyingFutureImpl;
 import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -189,10 +188,7 @@
 
    @Override
    public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
-      if (ctx.isOriginLocal()) {
-         List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         rpcManager.invokeRemotely(recipients, command, true, true);
-      }
+      if (ctx.isOriginLocal()) rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, true, true);
       return invokeNextInterceptor(ctx, command);
    }
 
@@ -200,7 +196,7 @@
    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
       if (ctx.isOriginLocal() && ctx.hasModifications()) {
-         List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
+         List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
          rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
          List<WriteCommand> mods = ctx.getModifications();
          flushL1Cache(recipients.size(), getKeys(mods), false, null, configuration.isSyncCommitPhase());
@@ -215,8 +211,7 @@
       boolean sync = isSynchronous(ctx);
 
       if (ctx.isOriginLocal() && ctx.hasModifications()) {
-         List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
+         List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
          // this method will return immediately if we're the only member (because exclude_self=true)
          rpcManager.invokeRemotely(recipients, command, sync);
          if (command.isOnePhaseCommit())
@@ -227,10 +222,7 @@
 
    @Override
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
-      if (ctx.isOriginLocal()) {
-         List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         rpcManager.invokeRemotely(recipients, command, configuration.isSyncRollbackPhase(), true);
-      }
+      if (ctx.isOriginLocal()) rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, configuration.isSyncRollbackPhase(), true);
       return invokeNextInterceptor(ctx, command);
    }
 
@@ -312,7 +304,7 @@
                }
             }
          } else {
-            ((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
+            ((TxInvocationContext) ctx).addAffectedKeys(recipientGenerator.getKeys());
          }
       }
       return returnValue;

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-11-11 17:48:58 UTC (rev 1137)
@@ -137,6 +137,10 @@
       log.info("Rehash complete");
       additionalWait();
 
+      // lets first see what the value of k1 is on c1 ...
+
+      System.out.println("***** K1 on C1 is " + c1.get(keys.get(0)));
+
       assertOnAllCachesAndOwnership(keys.get(0), "transactionally_replaced");
       assertOnAllCachesAndOwnership(keys.get(1), "v" + 2);
       assertOnAllCachesAndOwnership(keys.get(2), "v" + 3);



More information about the infinispan-commits mailing list