[infinispan-commits] Infinispan SVN: r2326 - in branches/4.2.x/core/src: main/java/org/infinispan/commands and 15 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Sep 6 09:17:02 EDT 2010
Author: mircea.markus
Date: 2010-09-06 09:17:00 -0400 (Mon, 06 Sep 2010)
New Revision: 2326
Added:
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DistDldGlobalTransaction.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DldGlobalTransaction.java
branches/4.2.x/core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java
Removed:
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/CacheDelegate.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactory.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/TransactionBoundaryCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java
branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
branches/4.2.x/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransactionFactory.java
branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectedException.java
branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
branches/4.2.x/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
branches/4.2.x/core/src/test/java/org/infinispan/test/AbstractInfinispanTest.java
branches/4.2.x/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
branches/4.2.x/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
branches/4.2.x/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
branches/4.2.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
branches/4.2.x/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
Log:
[ISPN-518] - Deadlock detection should be able to used with eager locking
Modified: branches/4.2.x/core/src/main/java/org/infinispan/CacheDelegate.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/CacheDelegate.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/CacheDelegate.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -287,8 +287,6 @@
public void lock(K key) {
assertKeyNotNull(key);
- //this will be removed with https://jira.jboss.org/browse/ISPN-598
- ConfigurationValidatingVisitor.checkEagerLockingAndDld(config, true);
lock(Collections.singletonList(key));
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -196,8 +196,9 @@
* <p/>
*
* @param command command to initialize. Cannot be null.
+ * @param isRemote
*/
- void initializeReplicableCommand(ReplicableCommand command);
+ void initializeReplicableCommand(ReplicableCommand command, boolean isRemote);
/**
* Builds an RpcCommand "envelope" containing multiple ReplicableCommands
@@ -283,14 +284,6 @@
/**
* Builds a RehashControlCommand for coordinating a rehash event. This particular variation of RehashControlCommand
* coordinates rehashing of nodes when a node join or leaves
- *
- * @param subtype
- * @param sender
- * @param state
- * @param oldCH
- * @param leaversHandled
- * @param newCH
- * @return
*/
RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype,
Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -213,7 +213,10 @@
return new ClusteredGetCommand(key, cacheName);
}
- public void initializeReplicableCommand(ReplicableCommand c) {
+ /**
+ * @param isRemote true if the command is deserialized and is executed remote.
+ */
+ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
if (c == null) return;
switch (c.getCommandId()) {
case PutKeyValueCommand.COMMAND_ID:
@@ -230,14 +233,14 @@
rc.init(interceptorChain, icc);
if (rc.getCommands() != null)
for (ReplicableCommand nested : rc.getCommands()) {
- initializeReplicableCommand(nested);
+ initializeReplicableCommand(nested, false);
}
break;
case SingleRpcCommand.COMMAND_ID:
SingleRpcCommand src = (SingleRpcCommand) c;
src.init(interceptorChain, icc);
if (src.getCommand() != null)
- initializeReplicableCommand(src.getCommand());
+ initializeReplicableCommand(src.getCommand(), false);
break;
case InvalidateCommand.COMMAND_ID:
@@ -253,15 +256,18 @@
pc.init(interceptorChain, icc, txTable);
pc.initialize(notifier);
if (pc.getModifications() != null)
- for (ReplicableCommand nested : pc.getModifications()) initializeReplicableCommand(nested);
+ for (ReplicableCommand nested : pc.getModifications()) initializeReplicableCommand(nested, false);
+ pc.markTransactionAsRemote(isRemote);
break;
case CommitCommand.COMMAND_ID:
CommitCommand commitCommand = (CommitCommand) c;
commitCommand.init(interceptorChain, icc, txTable);
+ commitCommand.markTransactionAsRemote(isRemote);
break;
case RollbackCommand.COMMAND_ID:
RollbackCommand rollbackCommand = (RollbackCommand) c;
rollbackCommand.init(interceptorChain, icc, txTable);
+ rollbackCommand.markTransactionAsRemote(isRemote);
break;
case ClearCommand.COMMAND_ID:
ClearCommand cc = (ClearCommand) c;
@@ -274,6 +280,7 @@
case LockControlCommand.COMMAND_ID:
LockControlCommand lcc = (LockControlCommand) c;
lcc.init(interceptorChain, icc, txTable);
+ lcc.markTransactionAsRemote(isRemote);
break;
case RehashControlCommand.COMMAND_ID:
RehashControlCommand rcc = (RehashControlCommand) c;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -6,6 +6,7 @@
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.remote.LockIntentionNotificationCommand;
import org.infinispan.commands.remote.MultipleRpcCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
@@ -29,10 +30,11 @@
* cache-specific components into it.
* <p />
* Usually a second step to unmarshalling a command from a byte stream (after
- * creating an un-initialized version using this factory) is to pass the command though {@link org.infinispan.commands.CommandsFactory#initializeReplicableCommand(ReplicableCommand)}.
+ * creating an un-initialized version using this factory) is to pass the command though {@link CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)}.
*
- * @see org.infinispan.commands.CommandsFactory#initializeReplicableCommand(ReplicableCommand)
+ * @see CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
@Scope(Scopes.GLOBAL)
@@ -48,7 +50,7 @@
* Creates an un-initialized command. Un-initialized in the sense that parameters will be set, but any components
* specific to the cache in question will not be set.
* <p/>
- * You would typically set these parameters using {@link org.infinispan.commands.CommandsFactory#initializeReplicableCommand(ReplicableCommand)}
+ * You would typically set these parameters using {@link CommandsFactory#initializeReplicableCommand(ReplicableCommand,boolean)}
* <p/>
*
* @param id id of the command
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -32,6 +32,8 @@
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.RemoteTransaction;
import org.infinispan.util.Util;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import java.util.Collection;
import java.util.Collections;
@@ -45,14 +47,20 @@
* For more details refer to: https://jira.jboss.org/jira/browse/ISPN-70 https://jira.jboss.org/jira/browse/ISPN-48
*
* @author Vladimir Blagojevic (<a href="mailto:vblagoje at redhat.com">vblagoje at redhat.com</a>)
+ * @author Mircea.Markus at jboss.com
+ * @param
* @since 4.0
*/
@Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.LOCK_CONTROL_COMMAND)
public class LockControlCommand extends AbstractTransactionBoundaryCommand {
+
+ private static Log log = LogFactory.getLog(LockControlCommand.class);
+
public static final int COMMAND_ID = 3;
private Set<Object> keys;
private Object singleKey;
private boolean implicit = false;
+ private boolean unlock = false;
public LockControlCommand() {
}
@@ -143,8 +151,13 @@
RemoteTxInvocationContext ctxt = icc.createRemoteTxInvocationContext();
RemoteTransaction transaction = txTable.getRemoteTransaction(globalTx);
- boolean remoteTxinitiated = transaction != null;
- if (!remoteTxinitiated) {
+ if (transaction == null) {
+ if (unlock) {
+ if (log.isTraceEnabled()) {
+ log.trace("Unlock for in-existing transaction: " + globalTx + ". Not doing anything.");
+ }
+ return null;
+ }
//create a remote tx without any modifications (we do not know modifications ahead of time)
transaction = txTable.createRemoteTransaction(globalTx);
}
@@ -159,11 +172,11 @@
public Object[] getParameters() {
if (keys == null || keys.isEmpty()) {
if (singleKey == null)
- return new Object[]{globalTx, cacheName, (byte) 1};
+ return new Object[]{globalTx, cacheName, unlock, (byte) 1};
else
- return new Object[]{globalTx, cacheName, (byte) 2, singleKey};
+ return new Object[]{globalTx, cacheName, unlock, (byte) 2, singleKey};
}
- return new Object[]{globalTx, cacheName, (byte) 3, keys};
+ return new Object[]{globalTx, cacheName, unlock, (byte) 3, keys};
}
@SuppressWarnings("unchecked")
@@ -172,22 +185,31 @@
throw new IllegalStateException("Unusupported command id:" + commandId);
globalTx = (GlobalTransaction) args[0];
cacheName = (String) args[1];
+ unlock = (Boolean)args[2];
keys = null;
singleKey = null;
- byte mode = (Byte) args[2];
+ byte mode = (Byte) args[3];
switch (mode) {
case 1:
break; // do nothing
case 2:
- singleKey = args[3];
+ singleKey = args[4];
break;
case 3:
- keys = (Set<Object>) args[3];
+ keys = (Set<Object>) args[4];
break;
}
}
+ public boolean isUnlock() {
+ return unlock;
+ }
+
+ public void setUnlock(boolean unlock) {
+ this.unlock = unlock;
+ }
+
public boolean equals(Object o) {
if (this == o)
return true;
@@ -197,7 +219,7 @@
LockControlCommand that = (LockControlCommand) o;
if (!super.equals(that))
return false;
- return keys.equals(that.keys) && Util.safeEquals(singleKey, that.singleKey);
+ return keys.equals(that.keys) && Util.safeEquals(singleKey, that.singleKey) && (unlock == that.unlock);
}
public int hashCode() {
@@ -213,6 +235,7 @@
", cacheName='" + cacheName +
", implicit='" + implicit +
", keys=" + keys +
+ ", unlock=" + unlock +
", singleKey=" + singleKey + '}';
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -105,7 +105,7 @@
// we need to "fix" these command lists - essentially propagate the init. TODO think of a nicer way to do this!!
for (List<? extends ReplicableCommand> commandList : Arrays.asList(txLogCommands, pendingPrepares)) {
if (commandList != null) {
- for (ReplicableCommand cmd : commandList) commandsFactory.initializeReplicableCommand(cmd);
+ for (ReplicableCommand cmd : commandList) commandsFactory.initializeReplicableCommand(cmd, false);
}
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -68,8 +68,13 @@
return globalTx;
}
+ public void markTransactionAsRemote(boolean isRemote) {
+ globalTx.setRemote(isRemote);
+ }
+
public Object perform(InvocationContext ctx) throws Throwable {
if (ctx != null) throw new IllegalStateException("Expected null context!");
+ markGtxAsRemote();
RemoteTransaction transaction = txTable.getRemoteTransaction(globalTx);
if (transaction == null) {
if (trace) log.info("Not found RemoteTransaction for tx id: " + globalTx);
@@ -117,4 +122,8 @@
", cacheName='" + cacheName + '\'' +
'}';
}
+
+ private void markGtxAsRemote() {
+ globalTx.setRemote(true);
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/TransactionBoundaryCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/TransactionBoundaryCommand.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/TransactionBoundaryCommand.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -30,9 +30,12 @@
* {@link org.infinispan.transaction.xa.GlobalTransaction}
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public interface TransactionBoundaryCommand extends VisitableCommand, CacheRpcCommand {
GlobalTransaction getGlobalTransaction();
+
+ void markTransactionAsRemote(boolean remote);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -48,18 +48,5 @@
@Override
public void visitConfiguration(Configuration bean) {
- checkEagerLockingAndDld(bean);
}
-
- private void checkEagerLockingAndDld(Configuration bean) {
- boolean isEagerLocking = bean.isUseEagerLocking();
- checkEagerLockingAndDld(bean, isEagerLocking);
- }
-
- public static void checkEagerLockingAndDld(Configuration bean, boolean eagerLocking) {
- boolean isDealLockDetection = bean.isEnableDeadlockDetection();
- if (isDealLockDetection && eagerLocking) {
- throw new ConfigurationException("Deadlock detection cannot be used with eager locking until ISPN-596 is fixed. See https://jira.jboss.org/browse/ISPN-596");
- }
- }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -1,29 +1,26 @@
package org.infinispan.interceptors;
-import org.infinispan.commands.DataCommand;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
-import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
-import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.transaction.xa.DldGlobalTransaction;
import org.infinispan.util.concurrent.locks.DeadlockDetectedException;
import org.infinispan.util.concurrent.locks.LockManager;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.util.HashSet;
+import java.util.Collections;
/**
- * This interceptor populates the {@link org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction} with
+ * This interceptor populates the {@link org.infinispan.transaction.xa.DldGlobalTransaction} with
* appropriate information needed in order to accomplish deadlock detection. It MUST process populate data before the
* replication takes place, so it will do all the tasks before calling {@link org.infinispan.interceptors.base.CommandInterceptor#invokeNextInterceptor(org.infinispan.context.InvocationContext,
* org.infinispan.commands.VisitableCommand)}.
@@ -35,21 +32,15 @@
*/
public class DeadlockDetectingInterceptor extends CommandInterceptor {
- private TransactionTable txTable;
private LockManager lockManager;
private TransactionManager txManager;
- private DistributionManager distributionManager;
@Inject
- public void init(TransactionTable txTable, LockManager lockManager, TransactionManager txManager,
- DistributionManager distributionManager) {
- this.txTable = txTable;
+ public void init(LockManager lockManager, TransactionManager txManager) {
this.lockManager = lockManager;
this.txManager = txManager;
- this.distributionManager = distributionManager;
}
-
/**
* Only does a sanity check.
*/
@@ -60,43 +51,6 @@
}
}
- private Object handleDataCommand(InvocationContext ctx, DataCommand command) throws Throwable {
- if (ctx.isInTxScope()) {
- DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- gtx.setLockIntention(command.getKey());
- gtx.setProcessingThread(Thread.currentThread());
- }
- try {
- return invokeNextInterceptor(ctx, command);
- } catch (InterruptedException ie) {
- if (ctx.isInTxScope()) {
- lockManager.releaseLocks(ctx);
- if (ctx.isOriginLocal()) {
- Transaction transaction = txManager.getTransaction();
- if (trace)
- log.trace("Marking the transaction for rollback! : " + transaction);
- if (transaction == null) {
- throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
- "associated witht the local thread but none found! (null)");
- }
- transaction.setRollbackOnly();
- txTable.removeLocalTransaction(transaction);
- throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction +
- "; it was marked for rollback");
- } else {
- DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- gtx.setMarkedForRollback(true);
- throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx +
- "; it was marked for rollback");
- }
- } else {
- if (trace)
- log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack", ie);
- throw ie;
- }
- }
- }
-
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
return handleDataCommand(ctx, command);
@@ -113,46 +67,29 @@
}
@Override
+ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
+ DldGlobalTransaction globalTransaction = (DldGlobalTransaction) ctx.getGlobalTransaction();
+ if (ctx.isOriginLocal()) {
+ globalTransaction.setRemoteLockIntention(command.getKeys());
+ }
+ return handleDataCommand(ctx, command);
+ }
+
+ @Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
- DeadlockDetectingGlobalTransaction globalTransaction = (DeadlockDetectingGlobalTransaction) ctx.getGlobalTransaction();
- globalTransaction.setProcessingThread(Thread.currentThread());
+ DldGlobalTransaction globalTransaction = (DldGlobalTransaction) ctx.getGlobalTransaction();
if (ctx.isOriginLocal()) {
- if (configuration.getCacheMode().isDistributed()) {
- globalTransaction.setReplicatingTo(new HashSet<Address>(distributionManager.getAffectedNodes(ctx.getAffectedKeys())));
- } else {
- globalTransaction.setReplicatingTo(null);
- }
- if (trace) log.trace("Deadlock detection information was added to " + globalTransaction);
+ globalTransaction.setRemoteLockIntention(command.getAffectedKeys());
}
- try {
- return invokeNextInterceptor(ctx, command);
- } catch (Throwable dde) {
- if (ctx.isOriginLocal()) {
- globalTransaction.setMarkedForRollback(true);
- boolean wasInterrupted = Thread.interrupted();
- if (trace)
- log.trace("Deadlock was detected on the remote side, marking the tx for rollback. Was this thread interrupted? " + wasInterrupted);
- }
- throw dde;
- } finally {
- if (!ctx.isOriginLocal()) {
- if (!txTable.containRemoteTx(ctx.getGlobalTransaction())) {
- if (trace) {
- log.trace("While returning from prepare we determined that remote tx is no longer in the txTable. " +
- "This means that a rollback was executed in between; releasing locks");
- }
- lockManager.releaseLocks(ctx);
- }
- }
+ Object result = invokeNextInterceptor(ctx, command);
+ if (ctx.isOriginLocal()) {
+ globalTransaction.setRemoteLockIntention(Collections.EMPTY_SET);
}
+ return result;
}
- @Override
- public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
- if (!ctx.isOriginLocal()) {
- DeadlockDetectingGlobalTransaction globalTransaction = (DeadlockDetectingGlobalTransaction) ctx.getGlobalTransaction();
- globalTransaction.interruptProcessingThread();
- }
+
+ private Object handleDataCommand(InvocationContext ctx, VisitableCommand command) throws Throwable {
return invokeNextInterceptor(ctx, command);
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -8,6 +8,7 @@
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.base.CommandInterceptor;
@@ -23,6 +24,7 @@
* For more details refer to: https://jira.jboss.org/jira/browse/ISPN-70 https://jira.jboss.org/jira/browse/ISPN-48
*
* @author <a href="mailto:vblagoje at redhat.com">Vladimir Blagojevic (vblagoje at redhat.com)</a>
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public class ImplicitEagerLockingInterceptor extends CommandInterceptor {
@@ -35,10 +37,8 @@
}
@Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command)
- throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
}
return invokeNextInterceptor(ctx, command);
@@ -46,8 +46,7 @@
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
}
return invokeNextInterceptor(ctx, command);
@@ -56,8 +55,7 @@
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
}
return invokeNextInterceptor(ctx, command);
@@ -65,8 +63,7 @@
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, new HashSet<Object>(command.getMap().keySet()));
}
return invokeNextInterceptor(ctx, command);
@@ -74,8 +71,7 @@
@Override
public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
}
return invokeNextInterceptor(ctx, command);
@@ -84,8 +80,7 @@
@Override
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
- boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
- if (localTxScope) {
+ if (shouldAcquireRemoteLock(ctx)) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
}
return invokeNextInterceptor(ctx, command);
@@ -95,4 +90,8 @@
LockControlCommand lcc = cf.buildLockControlCommand(keys, true);
return invokeNextInterceptor(ctx, lcc);
}
+
+ private boolean shouldAcquireRemoteLock(InvocationContext ctx) {
+ return ctx.isInTxScope() & ctx.isOriginLocal() && !ctx.hasFlag(Flag.CACHE_MODE_LOCAL);
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -58,7 +58,6 @@
* Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @author Mircea.Markus at jboss.com
* @see <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC designs</a>
* @since 4.0
*/
@@ -139,21 +138,45 @@
if (localTxScope) {
c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner());
}
- try {
+
+ if (c.isUnlock()) {
+ lockManager.releaseLocks(ctx);
+ if (log.isTraceEnabled()) log.trace("Lock released for: " + ctx.getLockOwner());
+ return null;
+ }
+
+ for (Object key : c.getKeys()) {
+ if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) {
+ //if even one key is unlocked we need to invoke this lock command cluster wide...
+ shouldInvokeOnCluster = true;
+ break;
+ }
+ }
+ boolean goRemoteFirst = configuration.isEnableDeadlockDetection() && localTxScope;
+ if (goRemoteFirst) {
+ Object result = invokeNextInterceptor(ctx, c);
+ try {
+ for (Object key : c.getKeys()) {
+ entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+ }
+ } catch (Throwable e) {
+ //if anything happen during locking then unlock remote
+ c.setUnlock(true);
+ invokeNextInterceptor(ctx, c);
+ throw e;
+ }
+ return result;
+ } else {
for (Object key : c.getKeys()) {
- if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) {
- //if even one key is unlocked we need to invoke this lock command cluster wide...
- shouldInvokeOnCluster = true;
- }
entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
}
if (shouldInvokeOnCluster || c.isExplicit())
return invokeNextInterceptor(ctx, c);
else
return null;
- } catch (Throwable te) {
+ }
+ } catch (Throwable te) {
return cleanLocksAndRethrow(ctx, te);
- }
} finally {
if (ctx.isInTxScope()) {
doAfterCall(ctx);
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -56,12 +56,13 @@
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
Object retVal = invokeNextInterceptor(ctx, command);
if (ctx.isOriginLocal()) {
- rpcManager.broadcastRpcCommand(command, true, false);
+ //unlock will happen async as it is a best effort
+ boolean sync = !command.isUnlock();
+ rpcManager.broadcastRpcCommand(command, sync, false);
}
return retVal;
}
-
protected final boolean isSynchronous(InvocationContext ctx) {
if (ctx.hasFlag(Flag.FORCE_SYNCHRONOUS))
return true;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/marshall/Ids.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -83,39 +83,41 @@
static final byte INVALIDATE_L1_COMMAND = 35;
static final byte LOCK_CONTROL_COMMAND = 36;
static final byte EVICT_COMMAND = 37;
+ // others
- // others
static final byte GLOBAL_TRANSACTION = 38;
static final byte JGROUPS_ADDRESS = 39;
static final byte MARSHALLED_VALUE = 40;
static final byte TRANSACTION_LOG_ENTRY = 41;
static final byte BUCKET = 42;
static final byte DEADLOCK_DETECTING_GLOBAL_TRANSACTION = 43;
-
/**
* ids for infinispan tree classes *
*/
static final byte NODE_KEY = 44;
+
static final byte FQN = 45;
+ static final byte ATOMIC_HASH_MAP_DELTA = 46;
- static final byte ATOMIC_HASH_MAP_DELTA = 46;
static final byte ATOMIC_PUT_OPERATION = 47;
static final byte ATOMIC_REMOVE_OPERATION = 48;
static final byte ATOMIC_CLEAR_OPERATION = 49;
+ static final byte REHASH_CONTROL_COMMAND = 50;
- static final byte REHASH_CONTROL_COMMAND = 50;
static final byte DEFAULT_CONSISTENT_HASH = 51;
static final byte UNION_CONSISTENT_HASH = 52;
static final byte JOIN_COMPLETE_COMMAND = 53;
-
/*
* ids for server modules
*/
+
static final byte SERVER_CACHE_VALUE = 55;
static final byte MEMCACHED_CACHE_VALUE = 56;
static final byte BYTE_ARRAY_KEY = 57;
static final byte TOPOLOGY_ADDRESS = 58;
static final byte TOPOLOGY_VIEW = 59;
+ //added in 4.2
+ static final byte LOCK_INTENTION_NOTIFICATION_COMMAND = 60;
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -33,6 +33,7 @@
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.remote.LockIntentionNotificationCommand;
import org.infinispan.commands.remote.MultipleRpcCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
@@ -73,7 +74,7 @@
import org.infinispan.remoting.responses.UnsuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
-import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
+import org.infinispan.transaction.xa.DldGlobalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteArrayKey;
import org.infinispan.util.FastCopyHashMap;
@@ -119,7 +120,7 @@
JDK_EXTERNALIZERS.put("java.util.Collections$SingletonList", SingletonListExternalizer.class.getName());
MARSHALLABLES.add(GlobalTransaction.class.getName());
- MARSHALLABLES.add(DeadlockDetectingGlobalTransaction.class.getName());
+ MARSHALLABLES.add(DldGlobalTransaction.class.getName());
MARSHALLABLES.add(JGroupsAddress.class.getName());
MARSHALLABLES.add("org.infinispan.util.Immutables$ImmutableMapWrapper");
MARSHALLABLES.add(MarshalledValue.class.getName());
Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -69,7 +69,7 @@
CommandsFactory commandsFactory = cr.getLocalComponent(CommandsFactory.class);
// initialize this command with components specific to the intended cache instance
- commandsFactory.initializeReplicableCommand(cmd);
+ commandsFactory.initializeReplicableCommand(cmd, true);
try {
log.trace("Calling perform() on {0}", cmd);
Modified: branches/4.2.x/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -232,7 +232,7 @@
WriteCommand[] mods = logEntry.getModifications();
if (trace) log.trace("Mods = {0}", Arrays.toString(mods));
for (WriteCommand mod : mods) {
- commandsFactory.initializeReplicableCommand(mod);
+ commandsFactory.initializeReplicableCommand(mod, false);
ctx.setFlags(CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STATUS_CHECK);
interceptorChain.invoke(ctx, mod);
}
@@ -265,7 +265,7 @@
if (!transactionLog.hasPendingPrepare(command)) {
if (trace) log.trace("Applying pending prepare {0}", command);
- commandsFactory.initializeReplicableCommand(command);
+ commandsFactory.initializeReplicableCommand(command, false);
RemoteTxInvocationContext ctx = invocationContextContainer.createRemoteTxInvocationContext();
RemoteTransaction transaction = txTable.createRemoteTransaction(command.getGlobalTransaction(), command.getModifications());
ctx.setRemoteTransaction(transaction);
Deleted: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -1,191 +0,0 @@
-package org.infinispan.transaction.xa;
-
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * This class is used when deadlock detection is enabled.
- *
- * @author Mircea.Markus at jboss.com
- */
- at Marshallable(externalizer = DeadlockDetectingGlobalTransaction.Externalizer.class, id = Ids.DEADLOCK_DETECTING_GLOBAL_TRANSACTION)
-public class DeadlockDetectingGlobalTransaction extends GlobalTransaction {
-
- private static final Log log = LogFactory.getLog(DeadlockDetectingGlobalTransaction.class);
-
- public static final boolean trace = log.isTraceEnabled();
-
- private Set<Address> replicatingTo = Collections.EMPTY_SET;
-
- private volatile transient Thread processingThread;
-
- private volatile long coinToss;
-
- private volatile boolean isMarkedForRollback;
-
- private transient volatile Object lockIntention;
-
-
- public DeadlockDetectingGlobalTransaction() {
- }
-
- DeadlockDetectingGlobalTransaction(Address addr, boolean remote) {
- super(addr, remote);
- }
-
- DeadlockDetectingGlobalTransaction(boolean remote) {
- super(null, remote);
- }
-
- /**
- * Is this global transaction replicating to the given address?
- */
- public boolean isReplicatingTo(Address address) {
- if (this.replicatingTo == null) {
- return true;
- } else {
- return this.replicatingTo.contains(address);
- }
- }
-
- /**
- * On a node, this will set the thread that handles replicatin on the given node.
- */
- public void setProcessingThread(Thread replicationThread) {
- if (trace) log.trace("Setting thread " + Thread.currentThread() + "on tx [" + this + "]");
- this.processingThread = replicationThread;
- }
-
- /**
- * Tries to interrupt the processing thread.
- */
- public synchronized void interruptProcessingThread() {
- if (isMarkedForRollback) {
- if (trace) log.trace("Not interrupting as tx is marked for rollback");
- return;
- }
- if (processingThread == null) {
- if(trace) log.trace("Processing thread is null, nothing to interrupt");
- return;
- }
- if (trace) {
- StackTraceElement[] stackTraceElements = processingThread.getStackTrace();
- StringBuilder builder = new StringBuilder();
- for (StackTraceElement stackTraceElement : stackTraceElements) {
- builder.append(" ").append(stackTraceElement).append('\n');
- }
- log.trace("About to interrupt thread: " + processingThread + ". Thread's stack trace is: \n" + builder.toString());
- }
- this.processingThread.interrupt();
- }
-
- /**
- * Sets the set og <b>Address</b> objects this node is replicating to.
- */
- public void setReplicatingTo(Set<Address> targets) {
- this.replicatingTo = targets;
- }
-
- /**
- * Based on the coin toss, determine whether this tx will continue working or this thread will be stopped.
- */
- public boolean thisWillInterrupt(DeadlockDetectingGlobalTransaction globalTransaction) {
- return this.coinToss > globalTransaction.coinToss;
- }
-
- /**
- * Sets the reandom number that defines the coin toss.
- */
- public void setCoinToss(long coinToss) {
- this.coinToss = coinToss;
- }
-
- public long getCoinToss() {
- return coinToss;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof DeadlockDetectingGlobalTransaction)) return false;
- if (!super.equals(o)) return false;
-
- DeadlockDetectingGlobalTransaction that = (DeadlockDetectingGlobalTransaction) o;
-
- if (coinToss != that.coinToss) return false;
- if (replicatingTo != null ? !replicatingTo.equals(that.replicatingTo) : that.replicatingTo != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (replicatingTo != null ? replicatingTo.hashCode() : 0);
- result = 31 * result + (int) (coinToss ^ (coinToss >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "DeadlockDetectingGlobalTransaction{" +
- "replicatingTo=" + replicatingTo +
- ", replicationThread=" + processingThread +
- ", coinToss=" + coinToss +
- "} " + super.toString();
- }
-
- /**
- * Once marked for rollback, the call to {@link #interruptProcessingThread()} will be ignored.
- */
- public synchronized boolean isMarkedForRollback() {
- return isMarkedForRollback;
- }
-
- public synchronized void setMarkedForRollback(boolean markedForRollback) {
- isMarkedForRollback = markedForRollback;
- }
-
- /**
- * Returns the key this transaction intends to lock.
- */
- public Object getLockIntention() {
- return lockIntention;
- }
-
- /**
- * Sets the lock this transaction intends to lock.
- */
- public void setLockIntention(Object lockIntention) {
- this.lockIntention = lockIntention;
- }
-
- public static class Externalizer extends GlobalTransaction.Externalizer {
- public Externalizer() {
- gtxFactory = new GlobalTransactionFactory(true);
- }
-
- @Override
- public void writeObject(ObjectOutput output, Object subject) throws IOException {
- super.writeObject(output, subject);
- DeadlockDetectingGlobalTransaction ddGt = (DeadlockDetectingGlobalTransaction) subject;
- output.writeLong(ddGt.getCoinToss());
- }
-
- @Override
- public Object readObject(ObjectInput input) throws IOException, ClassNotFoundException {
- DeadlockDetectingGlobalTransaction ddGt = (DeadlockDetectingGlobalTransaction) super.readObject(input);
- ddGt.setCoinToss(input.readLong());
- return ddGt;
- }
- }
-}
Added: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DistDldGlobalTransaction.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DistDldGlobalTransaction.java (rev 0)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DistDldGlobalTransaction.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -0,0 +1,35 @@
+package org.infinispan.transaction.xa;
+
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.remoting.transport.Address;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class DistDldGlobalTransaction extends DldGlobalTransaction {
+
+ private volatile DistributionManager distManager;
+ private volatile int numOwners;
+
+ public DistDldGlobalTransaction(DistributionManager distManager, int numOwners) {
+ this.distManager = distManager;
+ this.numOwners = numOwners;
+ }
+
+ public DistDldGlobalTransaction(Address addr, boolean remote, DistributionManager distManager, int numOwners) {
+ super(addr, remote);
+ this.distManager = distManager;
+ this.numOwners = numOwners;
+ }
+
+ @Override
+ public boolean isAcquiringRemoteLock(Object key, Address address) {
+ boolean affectsKey = remoteLockIntention.contains(key);
+ if (affectsKey) {
+ return distManager.getConsistentHash().isKeyLocalToAddress(address, key, numOwners);
+ } else {
+ return false;
+ }
+ }
+}
Copied: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DldGlobalTransaction.java (from rev 2288, branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java)
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DldGlobalTransaction.java (rev 0)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DldGlobalTransaction.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -0,0 +1,143 @@
+package org.infinispan.transaction.xa;
+
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * This class is used when deadlock detection is enabled.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Marshallable(externalizer = DldGlobalTransaction.Externalizer.class, id = Ids.DEADLOCK_DETECTING_GLOBAL_TRANSACTION)
+public class DldGlobalTransaction extends GlobalTransaction {
+
+ private static Log log = LogFactory.getLog(DldGlobalTransaction.class);
+
+ public static final boolean trace = log.isTraceEnabled();
+
+ private volatile long coinToss;
+
+ private volatile boolean isMarkedForRollback;
+
+ private transient volatile Object lockLocalLockIntention;
+
+ protected volatile Set<Object> remoteLockIntention = Collections.EMPTY_SET;
+
+
+ public DldGlobalTransaction() {
+ }
+
+ DldGlobalTransaction(Address addr, boolean remote) {
+ super(addr, remote);
+ }
+
+
+ /**
+ * Sets the reandom number that defines the coin toss.
+ */
+ public void setCoinToss(long coinToss) {
+ this.coinToss = coinToss;
+ }
+
+ public long getCoinToss() {
+ return coinToss;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof DldGlobalTransaction)) return false;
+ if (!super.equals(o)) return false;
+
+ DldGlobalTransaction that = (DldGlobalTransaction) o;
+
+ if (coinToss != that.coinToss) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (int) (coinToss ^ (coinToss >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DldGlobalTransaction{" +
+ "coinToss=" + coinToss +
+ ", isMarkedForRollback=" + isMarkedForRollback +
+ ", lockIntention=" + lockLocalLockIntention +
+ ", affectedKeys=" + remoteLockIntention +
+ "} " + super.toString();
+ }
+
+ public synchronized boolean isMarkedForRollback() {
+ return isMarkedForRollback;
+ }
+
+ public synchronized void setMarkedForRollback(boolean markedForRollback) {
+ isMarkedForRollback = markedForRollback;
+ }
+
+ /**
+ * Returns the key this transaction intends to lock.
+ */
+ public Object getLockIntention() {
+ return lockLocalLockIntention;
+ }
+
+ public void setLockLocalLockIntention(Object lockIntention) {
+ this.lockLocalLockIntention = lockIntention;
+ }
+
+ public boolean wouldLose(DldGlobalTransaction other) {
+ return this.coinToss < other.coinToss;
+ }
+
+ public boolean isAcquiringRemoteLock(Object key, Address address) {
+ boolean contains = remoteLockIntention.contains(key);
+ if (trace) log.trace("Intention check: does " + remoteLockIntention + " contain " + key + "? " + contains);
+ return contains; //this works for replication
+ }
+
+ public void setRemoteLockIntention(Set<Object> remoteLockIntention) {
+ if (trace) {
+ log.trace("Setting the affected keys set to: " + remoteLockIntention);
+ }
+ this.remoteLockIntention = remoteLockIntention;
+ }
+
+ public Set<Object> getRemoteLockIntention() {
+ return remoteLockIntention;
+ }
+
+ public static class Externalizer extends GlobalTransaction.Externalizer {
+ public Externalizer() {
+ gtxFactory = new GlobalTransactionFactory(true);
+ }
+
+ @Override
+ public void writeObject(ObjectOutput output, Object subject) throws IOException {
+ super.writeObject(output, subject);
+ DldGlobalTransaction ddGt = (DldGlobalTransaction) subject;
+ output.writeLong(ddGt.getCoinToss());
+ }
+
+ @Override
+ public Object readObject(ObjectInput input) throws IOException, ClassNotFoundException {
+ DldGlobalTransaction ddGt = (DldGlobalTransaction) super.readObject(input);
+ ddGt.setCoinToss(input.readLong());
+ return ddGt;
+ }
+ }
+}
Property changes on: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/DldGlobalTransaction.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -106,7 +106,7 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("GlobalTransaction:<").append(addr).append(">:").append(id);
+ sb.append("GlobalTransaction:<").append(addr).append(">:").append(id).append(isRemote() ? ":remote" : ":local");
return sb.toString();
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransactionFactory.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransactionFactory.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/GlobalTransactionFactory.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -1,6 +1,7 @@
package org.infinispan.transaction.xa;
import org.infinispan.config.Configuration;
+import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.transport.Address;
@@ -8,7 +9,7 @@
import java.util.Random;
/**
- * Factory for GlobalTransaction/DadlockDetectingGlobalTransaction.
+ * Factory for GlobalTransaction/DeadlockDetectingGlobalTransaction.
*
* @author Mircea.Markus at jboss.com
*/
@@ -16,6 +17,16 @@
private boolean isEddEnabled = false;
+ private DistributionManager distributionManager;
+
+ private Configuration configuration;
+
+ @Inject
+ public void init(DistributionManager distributionManager, Configuration configuration) {
+ this.distributionManager = distributionManager;
+ this.configuration = configuration;
+ }
+
/** this class is internally synchronized, so it can be shared between instances */
private final Random rnd = new Random();
@@ -43,7 +54,7 @@
public GlobalTransaction instantiateGlobalTransaction() {
if (isEddEnabled) {
- return new DeadlockDetectingGlobalTransaction();
+ return new DldGlobalTransaction();
} else {
return new GlobalTransaction();
}
@@ -52,7 +63,12 @@
public GlobalTransaction newGlobalTransaction(Address addr, boolean remote) {
GlobalTransaction gtx;
if (isEddEnabled) {
- DeadlockDetectingGlobalTransaction globalTransaction = new DeadlockDetectingGlobalTransaction(addr, remote);
+ DldGlobalTransaction globalTransaction;
+ if (configuration.getCacheMode().isDistributed()) {
+ globalTransaction = new DistDldGlobalTransaction(addr, remote, distributionManager, configuration.getNumOwners());
+ } else {
+ globalTransaction = new DldGlobalTransaction(addr, remote);
+ }
globalTransaction.setCoinToss(generateRandomId());
gtx = globalTransaction;
} else {
Modified: branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectedException.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectedException.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectedException.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -1,6 +1,7 @@
package org.infinispan.util.concurrent.locks;
import org.infinispan.CacheException;
+import org.infinispan.util.concurrent.TimeoutException;
/**
* Exception signaling detected deadlocks.
Modified: branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -1,13 +1,12 @@
package org.infinispan.util.concurrent.locks;
import org.infinispan.context.InvocationContext;
-import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Start;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
+import org.infinispan.transaction.xa.DldGlobalTransaction;
+import org.infinispan.transaction.xa.TransactionTable;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.rhq.helpers.pluginAnnotations.agent.MeasurementType;
@@ -15,10 +14,21 @@
import org.rhq.helpers.pluginAnnotations.agent.Operation;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import java.util.concurrent.atomic.AtomicLong;
/**
* Lock manager in charge with processing deadlock detections.
+ * Implementation notes: if a deadlock is detected, then one of the transactions has to rollback. The transaction that
+ * rollbacks is determined by comparing the coin toss from {@link org.infinispan.transaction.xa.DldGlobalTransaction}.
+ * A thread calling {@link DeadlockDetectingLockManager#lockAndRecord(Object, org.infinispan.context.InvocationContext)}
+ * would run the deadlock detection algorithm only if all of the following take place:
+ * - the call is made in the scope of a transaction (either locally originated or remotely originated)
+ * - it cannot acquire lock on the given key and the lock owner is another transaction
+ * - when comparing coin toss, this thread would loose against the other one - so it's always the potential loser that runs DLD.
+ * If deadlock is detected then {@link #lockAndRecord(Object, org.infinispan.context.InvocationContext)} would throw an
+ * {@link org.infinispan.util.concurrent.locks.DeadlockDetectedException}. This is subsequently handled in
+ * in the interceptor chain - locsk owned by this tx are released.
*
* @author Mircea.Markus at jboss.com
*/
@@ -31,52 +41,49 @@
protected volatile boolean exposeJmxStats;
- private volatile boolean isSync;
+ private AtomicLong localTxStopped = new AtomicLong(0);
- private AtomicLong detectedRemoteDeadlocks = new AtomicLong(0);
+ private AtomicLong remoteTxStopped = new AtomicLong(0);
- private AtomicLong detectedLocalDeadlocks = new AtomicLong(0);
+ private AtomicLong cannotRunDld = new AtomicLong(0);
- private AtomicLong locallyInterruptedTransactions = new AtomicLong(0);
-
- private AtomicLong overlapWithNotDeadlockAwareLockOwners = new AtomicLong(0);
-
@Start
public void init() {
spinDuration = configuration.getDeadlockDetectionSpinDuration();
exposeJmxStats = configuration.isExposeJmxStatistics();
- isSync = configuration.getCacheMode().isSynchronous();
}
public boolean lockAndRecord(Object key, InvocationContext ctx) throws InterruptedException {
long lockTimeout = getLockAcquisitionTimeout(ctx);
if (trace) log.trace("Attempting to lock {0} with acquisition timeout of {1} millis", key, lockTimeout);
+
if (ctx.isInTxScope()) {
if (trace) log.trace("Using early dead lock detection");
final long start = System.currentTimeMillis();
- long now;
- while ((now = System.currentTimeMillis()) < (start + lockTimeout)) {
+ DldGlobalTransaction thisTx = (DldGlobalTransaction) ctx.getLockOwner();
+ thisTx.setLockLocalLockIntention(key);
+ if (trace) log.trace("Setting lock intention to: " + key);
+
+ while (System.currentTimeMillis() < (start + lockTimeout)) {
if (lockContainer.acquireLock(key, spinDuration, MILLISECONDS) != null) {
+ thisTx.setLockLocalLockIntention(null); //clear lock intention
if (trace) log.trace("successfully acquired lock on " + key + ", returning ...");
return true;
} else {
- if (trace)
- log.trace("Could not acquire lock on '" + key + "' as it is locked by '" + getOwner(key) + "', check for dead locks");
Object owner = getOwner(key);
- if (!(owner instanceof DeadlockDetectingGlobalTransaction)) {
- if (trace)
- log.trace("Owner is not instance of DeadlockDetectingGlobalTransaction: " + owner + ", continuing ...");
- if (exposeJmxStats) overlapWithNotDeadlockAwareLockOwners.incrementAndGet();
- continue; //try to acquire lock again, for the rest of the time
+ if (!(owner instanceof DldGlobalTransaction)) {
+ if (trace) log.trace("Not running DLD as lock owner( " + owner + ") is not a transaction");
+ cannotRunDld.incrementAndGet();
+ continue;
}
- DeadlockDetectingGlobalTransaction lockOwnerTx = (DeadlockDetectingGlobalTransaction) owner;
- if (isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
- return remoteVsRemoteDld(key, ctx, lockTimeout, start, now, lockOwnerTx);
+ DldGlobalTransaction lockOwnerTx = (DldGlobalTransaction) owner;
+ if (isDeadlockAndIAmLoosing(lockOwnerTx, thisTx, key)) {
+ updateStats(thisTx);
+ String message = "Deadlock found and we " + thisTx + " shall not continue. Other tx is " + lockOwnerTx;
+ if (trace) log.trace(message);
+ throw new DeadlockDetectedException(message);
}
- if ((ctx.isOriginLocal() && !lockOwnerTx.isRemote()) || (!isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote())) {
- localVsLocalDld(ctx, lockOwnerTx);
- }
}
}
} else {
@@ -88,100 +95,100 @@
return false;
}
- private void localVsLocalDld(InvocationContext ctx, DeadlockDetectingGlobalTransaction lockOwnerTx) {
- if (trace) log.trace("Looking for local vs local deadlocks");
- DeadlockDetectingGlobalTransaction thisThreadsTx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- boolean weOwnLock = ownsLock(lockOwnerTx.getLockIntention(), thisThreadsTx);
- if (trace) {
- log.trace("Other owner's intention is " + lockOwnerTx.getLockIntention() + ". Do we(" + thisThreadsTx + ") own lock for it? " + weOwnLock + ". Lock owner is " + getOwner(lockOwnerTx.getLockIntention()));
+ private boolean isDeadlockAndIAmLoosing(DldGlobalTransaction lockOwnerTx, DldGlobalTransaction thisTx, Object key) {
+ //run the lose check first as it is cheaper
+ boolean wouldWeLoose = thisTx.wouldLose(lockOwnerTx);
+ if (!wouldWeLoose) {
+ if (trace) log.trace("We (" + thisTx + ") wouldn't lose against the other(" + lockOwnerTx + ") transaction, so no point running rest of DLD");
+ return false;
}
- if (weOwnLock) {
- boolean iShouldInterrupt = thisThreadsTx.thisWillInterrupt(lockOwnerTx);
- if (trace)
- log.trace("deadlock situation detected. Shall I interrupt?" + iShouldInterrupt );
- if (iShouldInterrupt) {
- lockOwnerTx.interruptProcessingThread();
- if (exposeJmxStats) detectedLocalDeadlocks.incrementAndGet();
- }
- }
+ //do we have lock on what other tx intends to acquire?
+ return ownsLocalIntention(thisTx, lockOwnerTx.getLockIntention()) || ownsRemoteIntention(lockOwnerTx, thisTx, key);
}
- private boolean remoteVsRemoteDld(Object key, InvocationContext ctx, long lockTimeout, long start, long now, DeadlockDetectingGlobalTransaction lockOwnerTx) throws InterruptedException {
- TxInvocationContext remoteTxContext = (TxInvocationContext) ctx;
- Address origin = remoteTxContext.getGlobalTransaction().getAddress();
- DeadlockDetectingGlobalTransaction remoteGlobalTransaction = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- boolean thisShouldInterrupt = remoteGlobalTransaction.thisWillInterrupt(lockOwnerTx);
- if (trace) log.trace("Should I interrupt other transaction ? " + thisShouldInterrupt);
- boolean isDeadLock = (configuration.getCacheMode().isReplicated() || lockOwnerTx.isReplicatingTo(origin)) && !lockOwnerTx.isRemote();
- if (thisShouldInterrupt && isDeadLock) {
- lockOwnerTx.interruptProcessingThread();
- if (exposeJmxStats) {
- detectedRemoteDeadlocks.incrementAndGet();
- locallyInterruptedTransactions.incrementAndGet();
+ /**
+ * This happens with two nodes replicating same tx at the same time.
+ */
+ private boolean ownsRemoteIntention(DldGlobalTransaction lockOwnerTx, DldGlobalTransaction thisTx, Object key) {
+ if (!lockOwnerTx.isRemote()) {
+ // I've already acquired lock on this key before replicating here, so this mean we are in deadlock. This assumes the fact that
+ // if trying to acquire a remote lock, a tx first acquires a local lock. This stands true in all situations but
+ // when DLD + eager locking is used (in this scenario remote locks are acquired first).
+ if (lockOwnerTx.isAcquiringRemoteLock(key, thisTx.getAddress())) {
+ if (trace)
+ log.trace("Same key deadlock detected: lock owner tries to acquire lock remotely on " + key + " but we have it!");
+ return true;
}
- return lockForTheRemainingTime(key, lockTimeout, start, now);
- } else if (!isDeadLock) {
- return lockForTheRemainingTime(key, lockTimeout, start, now);
- } else {
- if (trace)
- log.trace("Not trying to acquire lock anymore, as we're in deadlock and this will be rollback at origin");
- if (exposeJmxStats) {
- detectedRemoteDeadlocks.incrementAndGet();
+ for (Object remoteIntention : lockOwnerTx.getRemoteLockIntention()) {
+ if (ownsLock(remoteIntention, thisTx)) {
+ if (trace) log.trace("We own lock on a key ('" + remoteIntention + "') on which other tx wants to acquire remote lock");
+ return true;
+ }
}
- remoteGlobalTransaction.setMarkedForRollback(true);
- throw new DeadlockDetectedException("Deadlock situation detected on tx: " + remoteTxContext.getLockOwner());
}
+ return false;
}
- private boolean lockForTheRemainingTime(Object key, long lockTimeout, long start, long now) throws InterruptedException {
- long remainingLockingTime = (start + lockTimeout) - now;
- if (remainingLockingTime < 0)
- throw new IllegalStateException("No remaining time!!! The outer while condition MUST make sure this always stands true!");
- if (trace) log.trace("trying to lock for the remaining time: " + remainingLockingTime + " millis ");
- return lockContainer.acquireLock(key, remainingLockingTime, MILLISECONDS) != null;
+ private boolean ownsLocalIntention(DldGlobalTransaction tx, Object intention) {
+ boolean result = intention != null && ownsLock(intention, tx);
+ if (trace) log.trace("Intention is '" + intention + "'. Do we own lock for it? " + result + " We == " + tx);
+ return result;
}
public void setExposeJmxStats(boolean exposeJmxStats) {
this.exposeJmxStats = exposeJmxStats;
}
- @ManagedAttribute(description = "Number of situtations when we try to determine a deadlock and the other lock owner is e.g. a local tx. In this scenario we cannot run the deadlock detection mechanism")
- @Metric(displayName = "Number of unsolvable deadlock situations", measurementType = MeasurementType.TRENDSUP)
- public long getOverlapWithNotDeadlockAwareLockOwners() {
- return overlapWithNotDeadlockAwareLockOwners.get();
+
+ @ManagedAttribute (description = "Total number of local detected deadlocks")
+ @Metric(displayName = "Number of total detected deadlocks", measurementType = MeasurementType.TRENDSUP)
+ public long getTotalNumberOfDetectedDeadlocks() {
+ return localTxStopped.get() + remoteTxStopped.get();
}
- @ManagedAttribute(description = "Number of locally originated transactions that were interrupted as a deadlock situation was detected")
- @Metric(displayName = "Number of interrupted local transactions", measurementType = MeasurementType.TRENDSUP)
- public long getLocallyInterruptedTransactions() {
- return locallyInterruptedTransactions.get();
+ @ManagedOperation(description = "Resets statistics gathered by this component")
+ @Operation(displayName = "Reset statistics")
+ public void resetStatistics() {
+ localTxStopped.set(0);
+ remoteTxStopped.set(0);
+ cannotRunDld.set(0);
}
- @ManagedAttribute(description = "Number of remote deadlocks detected")
- @Metric(displayName = "Number of detected remote deadlocks", measurementType = MeasurementType.TRENDSUP)
+ @ManagedAttribute(description = "Number of remote transaction that were roll backed due to deadlocks")
+ @Metric(displayName = "Number of remote transaction that were roll backed due to deadlocks", measurementType = MeasurementType.TRENDSUP)
public long getDetectedRemoteDeadlocks() {
- return detectedRemoteDeadlocks.get();
+ return remoteTxStopped.get();
}
- @ManagedAttribute (description = "Number of local detected deadlocks")
- @Metric(displayName = "Number of detected local deadlocks", measurementType = MeasurementType.TRENDSUP)
+ @ManagedAttribute (description = "Number of local transaction that were roll backed due to deadlocks")
+ @Metric(displayName = "Number of local transaction that were roll backed due to deadlocks", measurementType = MeasurementType.TRENDSUP)
public long getDetectedLocalDeadlocks() {
- return detectedLocalDeadlocks.get();
+ return localTxStopped.get();
}
- @ManagedAttribute (description = "Total number of local detected deadlocks")
- @Metric(displayName = "Number of total detected deadlocks", measurementType = MeasurementType.TRENDSUP)
- public long getTotalNumberOfDetectedDeadlocks() {
- return detectedRemoteDeadlocks.get() + detectedLocalDeadlocks.get();
+ @ManagedAttribute(description = "Number of situtations when we try to determine a deadlock and the other lock owner is NOT a transaction. In this scenario we cannot run the deadlock detection mechanism")
+ @Metric(displayName = "Number of unsolvable deadlock situations", measurementType = MeasurementType.TRENDSUP)
+ public long getOverlapWithNotDeadlockAwareLockOwners() {
+ return cannotRunDld.get();
}
- @ManagedOperation(description = "Resets statistics gathered by this component")
- @Operation(displayName = "Reset statistics")
- public void resetStatistics() {
- overlapWithNotDeadlockAwareLockOwners.set(0);
- locallyInterruptedTransactions.set(0);
- detectedRemoteDeadlocks.set(0);
- detectedLocalDeadlocks.set(0);
+
+
+ private void updateStats(DldGlobalTransaction tx) {
+ if (exposeJmxStats) {
+ if (tx.isRemote())
+ remoteTxStopped.incrementAndGet();
+ else
+ localTxStopped.incrementAndGet();
+ }
}
+
+ @ManagedAttribute(description = "Number of locally originated transactions that were interrupted as a deadlock situation was detected")
+ @Metric(displayName = "Number of interrupted local transactions", measurementType = MeasurementType.TRENDSUP)
+ @Deprecated
+ public long getLocallyInterruptedTransactions() {
+ return -1;
+ }
+
}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -10,33 +10,33 @@
*
* @author Mircea.Markus at jboss.com
*/
- at Test(groups = "functional", enabled = false, testName = "distribution.DeadlockDetectionDistributionTest")
+ at Test(groups = "functional", testName = "distribution.DeadlockDetectionDistributionTest")
public class DeadlockDetectionDistributionTest extends ReplDeadlockDetectionTest {
- public DeadlockDetectionDistributionTest() {
- cacheMode = Configuration.CacheMode.DIST_SYNC;
- }
-
-
- public void testDeadlockDetectedTwoTransactions() throws Exception {
- fail("This test should be updated to make sure tx replicate on opposite nodes");
- }
-
-
- //following methods are overridden as TestNG will otherwise run them even if I mark the class as enabled = false
-
- @Override
- public void testExpectedInnerStructure() {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
- }
-
- @Override
- public void testDeadlockDetectedOneTx() throws Exception {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
- }
-
- @Override
- public void testLockReleasedWhileTryingToAcquire() throws Exception {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
- }
+// public DeadlockDetectionDistributionTest() {
+// cacheMode = Configuration.CacheMode.DIST_SYNC;
+// }
+//
+//
+// public void testDeadlockDetectedTwoTransactions() throws Exception {
+// fail("This test should be updated to make sure tx replicate on opposite nodes");
+// }
+//
+//
+// //following methods are overridden as TestNG will otherwise run them even if I mark the class as enabled = false
+//
+// @Override
+// public void testExpectedInnerStructure() {
+// throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+// }
+//
+// @Override
+// public void testDeadlockDetectedOneTx() throws Exception {
+// throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+// }
+//
+// @Override
+// public void testLockReleasedWhileTryingToAcquire() throws Exception {
+// throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+// }
}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/AbstractInfinispanTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/AbstractInfinispanTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/AbstractInfinispanTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -21,31 +21,58 @@
*/
package org.infinispan.test;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterTest;
+
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import org.testng.annotations.AfterTest;
+import static org.testng.Assert.assertEquals;
/**
- * AbstractInfinispanTest is a superclass of all Infinispan tests.
- *
+ * AbstractInfinispanTest is a superclass of all Infinispan tests.
+ *
* @author Vladimir Blagojevic
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public class AbstractInfinispanTest {
-
- @AfterTest(alwaysRun=true)
+
+ @AfterTest(alwaysRun = true)
protected void nullifyInstanceFields() {
- for(Class<?> current = this.getClass();current.getSuperclass() != null; current = current.getSuperclass()) {
+ for (Class<?> current = this.getClass(); current.getSuperclass() != null; current = current.getSuperclass()) {
Field[] fields = current.getDeclaredFields();
- for(Field f:fields) {
- try {
- if(!Modifier.isStatic(f.getModifiers()) && !f.getDeclaringClass().isPrimitive()) {
+ for (Field f : fields) {
+ try {
+ if (!Modifier.isStatic(f.getModifiers()) && !f.getDeclaringClass().isPrimitive()) {
f.setAccessible(true);
f.set(this, null);
}
- } catch (Exception e) {}
- }
- }
+ } catch (Exception e) {}
+ }
+ }
}
+
+ public void eventually(Condition ec, long timeout) {
+ int loops = 10;
+ long sleepDuration = timeout / loops;
+ try {
+ for (int i = 0; i < loops; i++) {
+
+ if (ec.isSatisfied()) break;
+ Thread.sleep(sleepDuration);
+ }
+ assertEquals(true, ec.isSatisfied());
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected!", e);
+ }
+ }
+
+ public void eventually(Condition ec) {
+ eventually(ec, 10000);
+ }
+
+ public interface Condition {
+ public boolean isSatisfied() throws Exception;
+ }
}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -4,6 +4,7 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -24,6 +25,7 @@
private BlockingQueue<Object> toExecute = new ArrayBlockingQueue<Object>(1);
private volatile Object response;
private CountDownLatch responseLatch = new CountDownLatch(1);
+ private volatile Transaction ongoingTransaction;
private volatile Object key, value;
@@ -75,6 +77,7 @@
TransactionManager txManager = TestingUtil.getTransactionManager(cache);
try {
txManager.begin();
+ ongoingTransaction = txManager.getTransaction();
setResponse(OperationsResult.BEGGIN_TX_OK);
} catch (Exception e) {
log.trace("Failure on beggining tx", e);
@@ -86,6 +89,7 @@
TransactionManager txManager = TestingUtil.getTransactionManager(cache);
try {
txManager.commit();
+ ongoingTransaction = null;
setResponse(OperationsResult.COMMIT_TX_OK);
} catch (Exception e) {
log.trace("Exception while committing tx", e);
@@ -210,6 +214,9 @@
*/
public static enum OperationsResult {
BEGGIN_TX_OK, COMMIT_TX_OK, PUT_KEY_VALUE_OK, REMOVE_KEY_OK, REPLACE_KEY_VALUE_OK, STOP_THREAD_OK
+ }
+ public Transaction getOngoingTransaction() {
+ return ongoingTransaction;
}
}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -92,13 +92,15 @@
t0.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
- LockManager lockManager = TestingUtil.extractLockManager(cache1);
- while (!lockManager.isLocked("k1")) {
- Thread.sleep(50);
- }
- System.out.println("successful replication !");
+ final LockManager lm1 = TestingUtil.extractLockManager(cache1);
+ eventually(new Condition() {
+ public boolean isSatisfied() throws Exception {
+ return lm1.isLocked("k1");
+ }
+ }); //now t0 replicated, acquired lock on k1 and it tries to acquire lock on k2
+
t1.setKeyValue("k1", "v1_t1");
t1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
@@ -106,15 +108,18 @@
Object t1Response = t1.waitForResponse();
Object t0Response = remoteReplicationInterceptor.getResponse();
- System.out.println("t0Response = " + t0Response);
- System.out.println("t1Response = " + t1Response);
+ log.trace("t0Response = " + t0Response);
+ log.trace("t1Response = " + t1Response);
assert xor(t1Response instanceof DeadlockDetectedException, t0Response instanceof DeadlockDetectedException);
+ TransactionTable transactionTable1 = TestingUtil.extractComponent(cache1, TransactionTable.class);
+
if (t0Response instanceof DeadlockDetectedException) {
replListener(cache0).expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
assertEquals(t1.execute(PerCacheExecutorThread.Operations.COMMIT_TX), PerCacheExecutorThread.OperationsResult.COMMIT_TX_OK);
replListener(cache0).waitForRpc();
+ assertEquals(transactionTable1.getLocalTxCount(), 0);
}
DeadlockDetectingLockManager ddLm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0);
@@ -132,8 +137,6 @@
assertEquals(transactionTable0.getRemoteTxCount(), 0);
- TransactionTable transactionTable1 = TestingUtil.extractComponent(cache1, TransactionTable.class);
- assertEquals(transactionTable1.getLocalTxCount(), 0);
for (int i = 0; i < 20; i++) {
if (!(transactionTable1.getRemoteTxCount() == 0)) Thread.sleep(50);
}
@@ -163,7 +166,7 @@
return invokeNextInterceptor(ctx, command);
} catch (Throwable throwable) {
if (!ctx.isOriginLocal()) {
- log.trace("Setting thrownExceptionForRemoteTx to " + throwable);
+ log.trace("Setting executionResponse to " + throwable);
executionResponse = throwable;
} else {
log.trace("Ignoring throwable " + throwable);
Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -0,0 +1,128 @@
+package org.infinispan.tx;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.context.Flag;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.PerCacheExecutorThread;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.infinispan.util.concurrent.locks.containers.LockContainer;
+import org.testng.annotations.Test;
+
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "tx.EagerTxDeadlockDetectionTest")
+public class EagerTxDeadlockDetectionTest extends MultipleCacheManagersTest {
+ private PerCacheExecutorThread ex1;
+ private PerCacheExecutorThread ex2;
+ private DeadlockDetectingLockManager lm0;
+ private DeadlockDetectingLockManager lm1;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration configuration = getConfiguration();
+ createClusteredCaches(2, configuration);
+ ex1 = new PerCacheExecutorThread(cache(0), 1);
+ ex2 = new PerCacheExecutorThread(cache(1), 2);
+ lm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(0));
+ lm0.setExposeJmxStats(true);
+ lm1 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(1));
+ lm1.setExposeJmxStats(true);
+ }
+
+ protected Configuration getConfiguration() throws Exception {
+ Configuration configuration = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+ configuration.setUseEagerLocking(true);
+ configuration.setEnableDeadlockDetection(true);
+ configuration.setUseLockStriping(false);
+ return configuration;
+ }
+
+ public void testDeadlock() throws Exception {
+ long start = System.currentTimeMillis();
+ ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+
+ ex1.setKeyValue("k1", "v1_1");
+ ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex2.setKeyValue("k2", "v2_2");
+ ex2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ assert lm0.isLocked("k1");
+ assert lm0.isLocked("k2");
+ assert lm1.isLocked("k1");
+ assert lm1.isLocked("k2");
+
+ log.trace("After first set of puts");
+
+ ex1.clearResponse();
+ ex2.clearResponse();
+
+ log.info("Here is where DLD happens");
+
+ ex2.setKeyValue("k1", "v1_2");
+ ex2.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.setKeyValue("k2", "v2_1");
+ ex1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.waitForResponse();
+ ex2.waitForResponse();
+
+
+ boolean b1 = ex1.lastResponse() instanceof Exception;
+ boolean b2 = ex2.lastResponse() instanceof Exception;
+ log.info("b1:", b1);
+ log.info("b2:", b2);
+ assert xor(b1, b2) : "Both are " + (b1 || b2);
+
+ assert xor(ex1.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK,
+ ex2.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK);
+
+
+ ex1.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+
+
+ assert cache(0).get("k1") != null;
+ assert cache(0).get("k2") != null;
+ assert cache(1).get("k1") != null;
+ assert cache(1).get("k2") != null;
+
+ long totalDeadlocks = lm0.getTotalNumberOfDetectedDeadlocks() + lm1.getTotalNumberOfDetectedDeadlocks();
+ assert totalDeadlocks == 1 : "Expected 1 but received " + totalDeadlocks;
+
+ System.out.println("Test took " + (System.currentTimeMillis() - start) + " millis.");
+ }
+
+ /**
+ * On eager locking, remote locks are being acquired at first, and then local locks. This is for specifying the
+ * behavior whe remote acquisition succeeds and local fails.
+ */
+ public void testDeadlockFailedToAcquireLocalLocks() throws Exception {
+ //first acquire a local lock on k1
+ TransactionManager tm = TestingUtil.getTransactionManager(cache(0));
+ tm.begin();
+ cache(0).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).put("k1","v1");
+ assert lm0.isLocked("k1");
+ assert !lm1.isLocked("k1");
+
+ try {
+ ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+ ex1.setKeyValue("k1", "v1_1");
+ ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ assert ex1.lastResponse() instanceof TimeoutException;
+ eventually(new Condition() {
+ public boolean isSatisfied() throws Exception {
+ return !lm1.isLocked("k1");
+ }
+ });
+ } finally {
+ tm.rollback();
+ }
+ }
+}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -55,7 +55,7 @@
t1.stopThread();
t2.stopThread();
}
-
+
public void testDldPutAndPut() {
testLocalVsLocalTxDeadlock(PerCacheExecutorThread.Operations.PUT_KEY_VALUE,
PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
@@ -149,15 +149,14 @@
assert PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK == t1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
assert PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK == t2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
- System.out.println("After beggin");
+ System.out.println("After begin");
t1.setKeyValue("k1", "value_1_t1");
t2.setKeyValue("k2", "value_2_t2");
- assert firstOperation.getCorrespondingOkResult() == t1.execute(firstOperation);
- assert firstOperation.getCorrespondingOkResult() == t2.execute(firstOperation);
+ assertEquals(t1.execute(firstOperation), firstOperation.getCorrespondingOkResult());
+ assertEquals(t2.execute(firstOperation), firstOperation.getCorrespondingOkResult());
- System.out.println("After first PUT");
assert lockManager.isLocked("k1");
assert lockManager.isLocked("k2");
Modified: branches/4.2.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -111,7 +111,7 @@
t2.setKeyValue("key", "value2");
assert PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK == t1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
assert PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK == t2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
- System.out.println("After beggin");
+ System.out.println("After begin");
t1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
t2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
@@ -209,7 +209,7 @@
replicationLatch.countDown();
- Thread.sleep(3000); //just to make sure the remote tx thread managed to spin around for some times.
+ Thread.sleep(3000); //just to make sure the remote tx thread managed to spin around for some times.
lm2.unlock("key");
t1.waitForResponse();
Modified: branches/4.2.x/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java 2010-09-05 13:12:25 UTC (rev 2325)
+++ branches/4.2.x/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java 2010-09-06 13:17:00 UTC (rev 2326)
@@ -5,14 +5,16 @@
import static org.easymock.EasyMock.expect;
import static org.easymock.classextension.EasyMock.replay;
import static org.easymock.classextension.EasyMock.verify;
+import static org.testng.Assert.assertEquals;
import org.infinispan.config.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.test.AbstractInfinispanTest;
+import org.infinispan.transaction.xa.DldGlobalTransaction;
import org.infinispan.transaction.xa.GlobalTransactionFactory;
-import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
+import org.infinispan.util.concurrent.locks.DeadlockDetectedException;
import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
import org.infinispan.util.concurrent.locks.containers.LockContainer;
import org.testng.annotations.BeforeMethod;
@@ -34,13 +36,13 @@
Configuration config = new Configuration();
private LockContainer lc;
private static final int SPIN_DURATION = 1000;
- private DeadlockDetectingGlobalTransaction lockOwner;
+ private DldGlobalTransaction lockOwner;
@BeforeMethod
public void setUp() {
lc = createMock(LockContainer.class);
lockManager = new DeadlockDetectingLockManagerMock(SPIN_DURATION, true, lc, config);
- lockOwner = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+ lockOwner = (DldGlobalTransaction) gtf.instantiateGlobalTransaction();
}
@@ -58,7 +60,7 @@
}
public void testLockHeldByThread() throws Exception {
- InvocationContext localTxContext = new LocalTxInvocationContext();
+ InvocationContext localTxContext = buildLocalTxIc(new DldGlobalTransaction());
//this makes sure that we cannot acquire lock from the first try
expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(null);
@@ -73,31 +75,42 @@
}
public void testLocalDeadlock() throws Exception {
- final DeadlockDetectingGlobalTransaction ddgt = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+ final DldGlobalTransaction ddgt = (DldGlobalTransaction) gtf.instantiateGlobalTransaction();
- InvocationContext localTxContext = new LocalTxInvocationContext() {
- @Override
- public Object getLockOwner() {
- return ddgt;
- }
- };
+ InvocationContext localTxContext = buildLocalTxIc(ddgt);
ddgt.setCoinToss(0);
- lockOwner.setCoinToss(-1);
- assert ddgt.thisWillInterrupt(lockOwner);
+ lockOwner.setCoinToss(1);
+ assert ddgt.wouldLose(lockOwner);
//this makes sure that we cannot acquire lock from the first try
expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(null);
Lock mockLock = createNiceMock(Lock.class);
expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(mockLock);
lockOwner.setRemote(false);
+ lockOwner.setLockLocalLockIntention("k");
lockManager.setOwner(lockOwner);
lockManager.setOwnsLock(true);
replay(lc);
- assert lockManager.lockAndRecord("k", localTxContext);
- assert lockManager.getDetectedLocalDeadlocks() == 1;
+ try {
+ lockManager.lockAndRecord("k", localTxContext);
+ assert false;
+ } catch (DeadlockDetectedException e) {
+ //expected
+ }
+ assertEquals(1l,lockManager.getDetectedLocalDeadlocks());
}
+ private InvocationContext buildLocalTxIc(final DldGlobalTransaction ddgt) {
+ InvocationContext localTxContext = new LocalTxInvocationContext() {
+ @Override
+ public Object getLockOwner() {
+ return ddgt;
+ }
+ };
+ return localTxContext;
+ }
+
public static class DeadlockDetectingLockManagerMock extends DeadlockDetectingLockManager {
private Object owner;
More information about the infinispan-commits
mailing list