[infinispan-commits] Infinispan SVN: r1137 - in trunk/core/src: main/java/org/infinispan/commands/control and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Nov 11 12:48:59 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-11-11 12:48:58 -0500 (Wed, 11 Nov 2009)
New Revision: 1137
Modified:
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
Log:
Updated to deal with the fact that a target recipient list may change between a prepare and a commit command being broadcast, and in-fly prepares would be pushed to new nodes as a part of a rehash. Hence, the commits need to be redirected as well.
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -120,5 +120,9 @@
RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state);
+ RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> state);
+
+ RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> state);
+
RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -25,6 +25,8 @@
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
+import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX_PREPARES;
+import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.KeySetCommand;
@@ -275,7 +277,7 @@
break;
case RehashControlCommand.COMMAND_ID:
RehashControlCommand rcc = (RehashControlCommand) c;
- rcc.init(distributionManager, configuration, dataContainer);
+ rcc.init(distributionManager, configuration, dataContainer, this);
break;
default:
if (trace)
@@ -295,8 +297,16 @@
return buildRehashControlCommand(type, sender, state, null);
}
+ public RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> commands) {
+ return new RehashControlCommand(cacheName, DRAIN_TX, sender, commands, null, this);
+ }
+
+ public RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> commands) {
+ return new RehashControlCommand(cacheName, DRAIN_TX_PREPARES, sender, null, commands, this);
+ }
+
public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
- return new RehashControlCommand(cacheName, type, sender, state, consistentHash);
+ return new RehashControlCommand(cacheName, type, sender, state, consistentHash, this);
}
}
Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,7 +1,11 @@
package org.infinispan.commands.control;
import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.BaseRpcCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -20,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Arrays;
/**
* A control command to coordinate rehashes that may occur when nodes join or leave a cluster, when DIST is used as a
@@ -34,7 +39,7 @@
public static final int COMMAND_ID = 17;
public enum Type {
- JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE
+ JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
}
Type type;
@@ -47,26 +52,51 @@
Transport transport;
Configuration configuration;
DataContainer dataContainer;
+ List<WriteCommand> txLogCommands;
+ List<PrepareCommand> pendingPrepares;
+ CommandsFactory commandsFactory;
- public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
+ public RehashControlCommand() {
+ }
+
+
+ public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,
+ ConsistentHash consistentHash, CommandsFactory commandsFactory) {
super(cacheName);
this.type = type;
this.sender = sender;
this.state = state;
this.consistentHash = consistentHash;
+ this.commandsFactory = commandsFactory;
}
- public RehashControlCommand() {
+ public RehashControlCommand(String cacheName, Type type, Address sender, List<WriteCommand> txLogCommands,
+ List<PrepareCommand> pendingPrepares, CommandsFactory commandsFactory) {
+ super(cacheName);
+ this.type = type;
+ this.sender = sender;
+ this.txLogCommands = txLogCommands;
+ this.pendingPrepares = pendingPrepares;
+ this.commandsFactory = commandsFactory;
}
public RehashControlCommand(Transport transport) {
this.transport = transport;
}
- public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer) {
+ public void init(DistributionManager distributionManager, Configuration configuration, DataContainer dataContainer,
+ CommandsFactory commandsFactory) {
this.distributionManager = distributionManager;
this.configuration = configuration;
this.dataContainer = dataContainer;
+ this.commandsFactory = commandsFactory;
+
+ // we need to "fix" these command lists - essentially propagate the init. TODO think of a nicer way to do this!!
+ for (List<? extends ReplicableCommand> commandList : Arrays.asList(txLogCommands, pendingPrepares)) {
+ if (commandList != null) {
+ for (ReplicableCommand cmd : commandList) commandsFactory.initializeReplicableCommand(cmd);
+ }
+ }
}
public Object perform(InvocationContext ctx) throws Throwable {
@@ -86,6 +116,12 @@
return pullState();
case PUSH_STATE:
return pushState();
+ case DRAIN_TX:
+ distributionManager.applyRemoteTxLog(txLogCommands);
+ return null;
+ case DRAIN_TX_PREPARES:
+ for (PrepareCommand pc : pendingPrepares) pc.perform(null);
+ return null;
}
throw new CacheException("Unknown rehash control command type " + type);
}
@@ -133,7 +169,7 @@
}
public Object[] getParameters() {
- return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash};
+ return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash, txLogCommands, pendingPrepares};
}
@SuppressWarnings("unchecked")
@@ -144,6 +180,8 @@
sender = (Address) parameters[i++];
state = (Map<Object, InternalCacheValue>) parameters[i++];
consistentHash = (ConsistentHash) parameters[i++];
+ txLogCommands = (List<WriteCommand>) parameters[i++];
+ pendingPrepares = (List<PrepareCommand>) parameters[i++];
}
@Override
@@ -153,6 +191,8 @@
", sender=" + sender +
", state=" + state +
", consistentHash=" + consistentHash +
+ ", txLogCommands=" + txLogCommands +
+ ", pendingPrepares=" + pendingPrepares +
'}';
}
}
Modified: trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,14 +1,13 @@
package org.infinispan.context.impl;
import org.infinispan.CacheException;
-import org.infinispan.remoting.transport.Address;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
/**
@@ -19,25 +18,25 @@
*/
public abstract class AbstractTxInvocationContext extends AbstractInvocationContext implements TxInvocationContext {
- protected Set<Address> txParticipants = null;
+ protected Set<Object> affectedKeys = null;
public boolean hasModifications() {
return getModifications() != null && !getModifications().isEmpty();
}
- public Set<Address> getTransactionParticipants() {
- return txParticipants == null ? Collections.EMPTY_SET : txParticipants;
+ public Set<Object> getAffectedKeys() {
+ return affectedKeys == null ? Collections.EMPTY_SET : affectedKeys;
}
public boolean isValidRunningTx() {
return isValid(getRunningTransaction());
}
- public void addTransactionParticipants(List<Address> addresses) {
- if (txParticipants == null) {
- txParticipants = new HashSet<Address>();
+ public void addAffectedKeys(Object... keys) {
+ if (affectedKeys == null) {
+ affectedKeys = new HashSet<Object>();
}
- txParticipants.addAll(addresses);
+ affectedKeys.addAll(Arrays.asList(keys));
}
@@ -97,8 +96,8 @@
@Override
public AbstractTxInvocationContext clone() {
AbstractTxInvocationContext dolly = (AbstractTxInvocationContext) super.clone();
- if (this.txParticipants != null) {
- dolly.txParticipants = new HashSet<Address>(txParticipants);
+ if (this.affectedKeys != null) {
+ dolly.affectedKeys = new HashSet<Object>(affectedKeys);
}
return dolly;
}
Modified: trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -2,7 +2,6 @@
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.context.InvocationContext;
-import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.xa.GlobalTransaction;
import javax.transaction.Transaction;
@@ -23,10 +22,10 @@
public boolean hasModifications();
/**
- * Returns the set of cluster participants (identified through {@link org.infinispan.remoting.transport.Address}
- * objects) that participate within the transaction. Null indicates all cluster members.
+ * Returns the set of keys that are affected by this transaction. Used to generate appropriate recipient groups
+ * for cluster-wide prepare and commit calls.
*/
- Set<Address> getTransactionParticipants();
+ Set<Object> getAffectedKeys();
/**
* Returns the id of the transaction assoctiated with the current call.
@@ -54,5 +53,5 @@
/**
* Registers a new participant with the transaction.
*/
- void addTransactionParticipants(List<Address> addresses);
+ void addAffectedKeys(Object... keys);
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -7,10 +7,12 @@
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.commands.write.WriteCommand;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A component that manages the distribution of elements across a cache cluster
@@ -108,5 +110,9 @@
boolean isJoinComplete();
void applyReceivedState(Map<Object, InternalCacheValue> state);
+
+ List<Address> getAffectedNodes(Set<Object> affectedKeys);
+
+ void applyRemoteTxLog(List<WriteCommand> txLogCommands);
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -44,9 +44,12 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -178,8 +181,8 @@
throw new CacheException(e);
}
- if (willReceiveLeaverState) {
- log.info("Starting transaction logging; expecting state from someone!");
+ if (willReceiveLeaverState || willSendLeaverState) {
+ log.info("Starting transaction logging!");
transactionLogger.enable();
}
@@ -350,7 +353,7 @@
applyState(consistentHash, state);
boolean unlocked = false;
try {
- drainTransactionLog();
+ drainLocalTransactionLog();
unlocked = true;
} finally {
if (!unlocked) transactionLogger.unlockAndDisable();
@@ -361,9 +364,9 @@
return joinComplete;
}
- void drainTransactionLog() {
+ void drainLocalTransactionLog() {
List<WriteCommand> c;
- while (transactionLogger.size() > 10) {
+ while (transactionLogger.shouldDrainWithoutLock()) {
c = transactionLogger.drain();
apply(c);
}
@@ -383,6 +386,18 @@
}
}
+ public List<Address> getAffectedNodes(Set<Object> affectedKeys) {
+ if (affectedKeys == null || affectedKeys.isEmpty()) return Collections.emptyList();
+
+ Set<Address> an = new HashSet<Address>();
+ for (List<Address> addresses : locateAll(affectedKeys).values()) an.addAll(addresses);
+ return new ArrayList<Address>(an);
+ }
+
+ public void applyRemoteTxLog(List<WriteCommand> txLogCommands) {
+ apply(txLogCommands);
+ }
+
@ManagedOperation(description = "Tells you whether a given key is local to this instance of the cache. Only works with String keys.")
@Operation(displayName = "Is key local?")
public boolean isLocatedLocally(@Parameter(name = "key", description = "Key to query") String key) {
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -127,7 +127,7 @@
}
// 8. Drain logs
- dmi.drainTransactionLog();
+ dmi.drainLocalTransactionLog();
}
unlocked = true;
Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -1,7 +1,10 @@
package org.infinispan.distribution;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.control.RehashControlCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -23,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
@@ -33,7 +37,7 @@
*/
public class LeaveTask extends RehashTask {
private static final Log log = LogFactory.getLog(LeaveTask.class);
-
+ private static final boolean trace = log.isTraceEnabled();
private final List<Address> leavers;
private final Address self;
private final List<Address> leaversHandled;
@@ -54,8 +58,9 @@
List<Address> leaversHandled = new LinkedList<Address>(leavers);
ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
int replCount = configuration.getNumOwners();
+
try {
- StateMap statemap = new StateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
+ InMemoryStateMap statemap = new InMemoryStateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
if (log.isTraceEnabled()) log.trace("Examining state in data container");
// need to constantly detect whether we are interrupted. If so, abort accordingly.
for (InternalCacheEntry ice : dataContainer) {
@@ -70,17 +75,19 @@
}
// push state.
- Set<Future<Void>> pushFutures = new HashSet<Future<Void>>();
+ Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
RehashControlCommand push = cf.buildRehashControlCommand(RehashControlCommand.Type.PUSH_STATE, self, entry.getValue());
- NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+ NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
pushFutures.add(f);
rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
}
for (Future f : pushFutures) f.get();
+ processAndDrainTxLog(oldCH, dmi.getConsistentHash(), replCount);
+
completedSuccessfully = true;
invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
if (log.isInfoEnabled())
@@ -96,6 +103,83 @@
}
}
+ private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ if (trace) log.trace("Processing transaction log iteratively");
+
+ List<WriteCommand> c;
+ int i = 0;
+ while (transactionLogger.shouldDrainWithoutLock()) {
+ if (trace) log.trace("Processing transaction log, iteration {0}", i++);
+ c = transactionLogger.drain();
+ if (trace) log.trace("Found {0} modifications", c.size());
+ apply(oldCH, newCH, replCount, c);
+ }
+
+ if (trace) log.trace("Processing transaction log: final drain and lock");
+ c = transactionLogger.drainAndLock();
+ if (trace) log.trace("Found {0} modifications", c.size());
+ apply(oldCH, newCH, replCount, c);
+
+ if (trace) log.trace("Handling pending prepares");
+ PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
+ Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
+ if (trace) log.trace("Found {0} pending prepares", pendingPrepares.size());
+ for (PrepareCommand pc : pendingPrepares) state.addState(pc);
+
+ if (trace) log.trace("State map for pending prepares is {0}", state.getState());
+
+ Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+ for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
+ if (log.isDebugEnabled())
+ log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
+ RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
+ NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+ pushFutures.add(f);
+ rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+ }
+
+ for (Future f : pushFutures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Error pushing tx log", e);
+ }
+ }
+ if (trace) log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
+
+ transactionLogger.unlockAndDisable();
+ }
+
+ private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount, List<WriteCommand> wc) {
+ // need to create another "state map"
+ TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
+ for (WriteCommand c : wc) state.addState(c);
+
+ if (trace) log.trace("State map for modifications is {0}", state.getState());
+
+ Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+ for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
+ if (log.isDebugEnabled())
+ log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
+ RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
+ NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+ pushFutures.add(f);
+ rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+ }
+
+ for (Future f : pushFutures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Error pushing tx log", e);
+ }
+ }
+ }
+
@Override
protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
@@ -104,31 +188,43 @@
}
}
-class StateMap {
+abstract class StateMap<S> {
List<Address> leavers;
- Address self;
ConsistentHash oldCH, newCH;
int replCount;
- Set<Object> keysHandled = new HashSet<Object>();
- Map<Address, Map<Object, InternalCacheValue>> state = new HashMap<Address, Map<Object, InternalCacheValue>>();
+ Map<Address, S> state = new HashMap<Address, S>();
- StateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
this.leavers = leavers;
- this.self = self;
this.oldCH = oldCH;
this.newCH = newCH;
this.replCount = replCount;
}
+ Map<Address, S> getState() {
+ return state;
+ }
+}
+
+class InMemoryStateMap extends StateMap<Map<Object, InternalCacheValue>> {
+ Address self;
+ Set<Object> keysHandled = new HashSet<Object>();
+
+ InMemoryStateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ this.self = self;
+ }
+
/**
* Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
* owner list
*
- * @param ice
+ * @param payload an InternalCacheEntry to add to the state map
*/
- void addState(InternalCacheEntry ice) {
+ void addState(InternalCacheEntry payload) {
+ Object key = payload.getKey();
for (Address leaver : leavers) {
- List<Address> owners = oldCH.locate(ice.getKey(), replCount);
+ List<Address> owners = oldCH.locate(key, replCount);
int leaverIndex = owners.indexOf(leaver);
if (leaverIndex > -1) {
int numOwners = owners.size();
@@ -137,7 +233,7 @@
if ((isLeaverLast && selfIndex == numOwners - 2) ||
(!isLeaverLast && selfIndex == leaverIndex + 1)) {
// add to state map!
- List<Address> newOwners = newCH.locate(ice.getKey(), replCount);
+ List<Address> newOwners = newCH.locate(key, replCount);
newOwners.removeAll(owners);
if (!newOwners.isEmpty()) {
for (Address no : newOwners) {
@@ -146,20 +242,92 @@
s = new HashMap<Object, InternalCacheValue>();
state.put(no, s);
}
- s.put(ice.getKey(), ice.toInternalCacheValue());
+ s.put(key, payload.toInternalCacheValue());
}
}
}
}
}
- keysHandled.add(ice.getKey());
+ keysHandled.add(key);
}
- Map<Address, Map<Object, InternalCacheValue>> getState() {
- return state;
- }
-
boolean containsKey(Object key) {
return keysHandled.contains(key);
}
}
+
+/**
+ * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
+ *
+ * @param <T> type of replicable command to aggregate
+ */
+abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
+ Set<Object> keysHandled = new HashSet<Object>();
+
+ CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ // if only Java had duck-typing!
+ abstract Set<Object> getAffectedKeys(T payload);
+
+ /**
+ * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+ * owner list
+ *
+ * @param payload payload to consider when adding to the aggregate state
+ */
+ void addState(T payload) {
+ for (Object key : getAffectedKeys(payload)) {
+ for (Address leaver : leavers) {
+ List<Address> owners = oldCH.locate(key, replCount);
+ int leaverIndex = owners.indexOf(leaver);
+ if (leaverIndex > -1) {
+ // add to state map!
+ List<Address> newOwners = newCH.locate(key, replCount);
+ newOwners.removeAll(owners);
+ if (!newOwners.isEmpty()) {
+ for (Address no : newOwners) {
+ List<T> s = state.get(no);
+ if (s == null) {
+ s = new LinkedList<T>();
+ state.put(no, s);
+ }
+ s.add(payload);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
+ * to nodes during a leave.
+ */
+class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
+ PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(PrepareCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
+ * made while state was being transferred to nodes during a leave.
+ */
+class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
+ TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(WriteCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -67,4 +67,19 @@
int size();
boolean isEnabled();
+
+ /**
+ * Tests whether the drain() method can be called without a lock. This is usually true if there is a lot of stuff
+ * to drain. After a certain threshold (once there are relatively few entries in the tx log) this will return false
+ * after which you should call drainAndLock() to clear the final parts of the log.
+ * @return true if drain() should be called, false if drainAndLock() should be called.
+ */
+ boolean shouldDrainWithoutLock();
+
+ /**
+ * Drains pending prepares. Note that this should *only* be done after calling drainAndLock() to prevent race
+ * conditions
+ * @return a list of prepares pending commit or rollback
+ */
+ Collection<PrepareCommand> getPendingPrepares();
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -9,6 +9,7 @@
import org.infinispan.util.logging.LogFactory;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -20,7 +21,7 @@
/**
* A transaction logger to log ongoing transactions in an efficient and thread-safe manner while a rehash is going on.
- *
+ * <p/>
* Transaction logs can then be replayed after the state transferred during a rehash has been written.
*
* @author Manik Surtani
@@ -33,8 +34,11 @@
final Map<GlobalTransaction, PrepareCommand> uncommittedPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
private static final Log log = LogFactory.getLog(TransactionLoggerImpl.class);
private static final boolean trace = log.isTraceEnabled();
+ // the number of transactions after which we need to lock and drain
+ private static final int DRAIN_LOCK_THRESHOLD = 10;
public void enable() {
+ log.trace("Starting transaction logging");
enabled = true;
}
@@ -53,6 +57,7 @@
public void unlockAndDisable() {
enabled = false;
loggingLock.writeLock().unlock();
+ log.trace("Stopping transaction logging");
}
public boolean logIfNeeded(WriteCommand command) {
@@ -75,21 +80,21 @@
}
public void logIfNeeded(PrepareCommand command) {
- if (enabled) {
- loggingLock.readLock().lock();
- try {
- if (enabled) {
- if (command.isOnePhaseCommit()) {
+ if (command.isOnePhaseCommit()) {
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
if (trace) log.trace("Logging 1PC prepare for tx {0}", command.getGlobalTransaction());
logModificationsInTransaction(command);
- } else {
- if (trace) log.trace("Logging 2PC prepare for tx {0}", command.getGlobalTransaction());
- uncommittedPrepares.put(command.getGlobalTransaction(), command);
}
+ } finally {
+ loggingLock.readLock().unlock();
}
- } finally {
- loggingLock.readLock().unlock();
}
+ } else {
+ if (trace) log.trace("Logging 2PC prepare for tx {0}", command.getGlobalTransaction());
+ uncommittedPrepares.put(command.getGlobalTransaction(), command);
}
}
@@ -104,12 +109,12 @@
}
public void logIfNeeded(CommitCommand command) {
+ PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
if (enabled) {
loggingLock.readLock().lock();
try {
if (enabled) {
if (trace) log.trace("Logging commit for tx {0}", command.getGlobalTransaction());
- PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
logModificationsInTransaction(pc);
}
} finally {
@@ -119,17 +124,8 @@
}
public void logIfNeeded(RollbackCommand command) {
- if (enabled) {
- loggingLock.readLock().lock();
- try {
- if (enabled) {
- if (trace) log.trace("Logging rollback for tx {0}", command.getGlobalTransaction());
- uncommittedPrepares.remove(command.getGlobalTransaction());
- }
- } finally {
- loggingLock.readLock().unlock();
- }
- }
+ if (trace) log.trace("Logging rollback for tx {0}", command.getGlobalTransaction());
+ uncommittedPrepares.remove(command.getGlobalTransaction());
}
public boolean logIfNeeded(Collection<WriteCommand> commands) {
@@ -159,4 +155,14 @@
public boolean isEnabled() {
return enabled;
}
+
+ public boolean shouldDrainWithoutLock() {
+ return size() > DRAIN_LOCK_THRESHOLD;
+ }
+
+ public Collection<PrepareCommand> getPendingPrepares() {
+ Collection<PrepareCommand> commands = new HashSet<PrepareCommand>(uncommittedPrepares.values());
+ uncommittedPrepares.clear();
+ return commands;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -8,6 +8,7 @@
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.CommandInterceptor;
@@ -19,7 +20,7 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.util.Set;
+import java.util.HashSet;
/**
* This interceptor populates the {@link org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction} with
@@ -37,12 +38,15 @@
private TransactionTable txTable;
private LockManager lockManager;
private TransactionManager txManager;
+ private DistributionManager distributionManager;
@Inject
- public void init(TransactionTable txTable, LockManager lockManager, TransactionManager txManager) {
+ public void init(TransactionTable txTable, LockManager lockManager, TransactionManager txManager,
+ DistributionManager distributionManager) {
this.txTable = txTable;
this.lockManager = lockManager;
this.txManager = txManager;
+ this.distributionManager = distributionManager;
}
@@ -114,8 +118,7 @@
globalTransaction.setProcessingThread(Thread.currentThread());
if (ctx.isOriginLocal()) {
if (configuration.getCacheMode().isDistributed()) {
- Set<Address> transactionParticipants = ctx.getTransactionParticipants();
- globalTransaction.setReplicatingTo(transactionParticipants);
+ globalTransaction.setReplicatingTo(new HashSet<Address>(distributionManager.getAffectedNodes(ctx.getAffectedKeys())));
} else {
globalTransaction.setReplicatingTo(null);
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -30,7 +30,6 @@
import org.infinispan.util.concurrent.NotifyingFutureImpl;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@@ -189,10 +188,7 @@
@Override
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
- if (ctx.isOriginLocal()) {
- List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- rpcManager.invokeRemotely(recipients, command, true, true);
- }
+ if (ctx.isOriginLocal()) rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, true, true);
return invokeNextInterceptor(ctx, command);
}
@@ -200,7 +196,7 @@
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
if (ctx.isOriginLocal() && ctx.hasModifications()) {
- List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
+ List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
List<WriteCommand> mods = ctx.getModifications();
flushL1Cache(recipients.size(), getKeys(mods), false, null, configuration.isSyncCommitPhase());
@@ -215,8 +211,7 @@
boolean sync = isSynchronous(ctx);
if (ctx.isOriginLocal() && ctx.hasModifications()) {
- List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
+ List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.invokeRemotely(recipients, command, sync);
if (command.isOnePhaseCommit())
@@ -227,10 +222,7 @@
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
- if (ctx.isOriginLocal()) {
- List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- rpcManager.invokeRemotely(recipients, command, configuration.isSyncRollbackPhase(), true);
- }
+ if (ctx.isOriginLocal()) rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, configuration.isSyncRollbackPhase(), true);
return invokeNextInterceptor(ctx, command);
}
@@ -312,7 +304,7 @@
}
}
} else {
- ((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
+ ((TxInvocationContext) ctx).addAffectedKeys(recipientGenerator.getKeys());
}
}
return returnValue;
Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-11-11 17:13:10 UTC (rev 1136)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-11-11 17:48:58 UTC (rev 1137)
@@ -137,6 +137,10 @@
log.info("Rehash complete");
additionalWait();
+ // lets first see what the value of k1 is on c1 ...
+
+ System.out.println("***** K1 on C1 is " + c1.get(keys.get(0)));
+
assertOnAllCachesAndOwnership(keys.get(0), "transactionally_replaced");
assertOnAllCachesAndOwnership(keys.get(1), "v" + 2);
assertOnAllCachesAndOwnership(keys.get(2), "v" + 3);
More information about the infinispan-commits
mailing list