[infinispan-commits] Infinispan SVN: r612 - in trunk/core/src: main/java/org/infinispan/interceptors and 8 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jul 23 09:28:57 EDT 2009
Author: mircea.markus
Date: 2009-07-23 09:28:56 -0400 (Thu, 23 Jul 2009)
New Revision: 612
Added:
trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
Modified:
trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java
trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
trunk/core/src/test/java/org/infinispan/test/ReplListener.java
trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
Log:
[ISPN-38] (Deadlock detection) - support DLD for async caches
Modified: trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -22,10 +22,9 @@
package org.infinispan.factories;
import org.infinispan.factories.annotations.DefaultFactoryFor;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.concurrent.locks.LockManagerImpl;
-import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
-import org.infinispan.config.ConfigurationException;
/**
* // TODO: MANIK: Document this
@@ -37,9 +36,6 @@
public class LockManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
public <T> T construct(Class<T> componentType) {
if (configuration.isEnableDeadlockDetection()) {
- if (!configuration.getCacheMode().isSynchronous()) {
- throw new ConfigurationException("Eager Dead lock detection can only be used for sync caches!");
- }
return (T) new DeadlockDetectingLockManager();
} else {
return (T) new LockManagerImpl();
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -26,7 +26,7 @@
* appropriate information needed in order to accomplish deadlock detection. It MUST process populate data before the
* replication takes place, so it will do all the tasks before calling {@link org.infinispan.interceptors.base.CommandInterceptor#invokeNextInterceptor(org.infinispan.context.InvocationContext,
* org.infinispan.commands.VisitableCommand)}.
- *
+ * <p/>
* Note: for local caches, deadlock detection dos NOT work for aggregate operations (clear, putAll).
*
* @author Mircea.Markus at jboss.com
@@ -65,21 +65,29 @@
try {
return invokeNextInterceptor(ctx, command);
} catch (InterruptedException ie) {
- if (ctx.isOriginLocal() && ctx.isInTxScope()) {
+ if (ctx.isInTxScope()) {
lockManager.releaseLocks(ctx);
- Transaction transaction = txManager.getTransaction();
- if (trace)
- log.trace("Marking the transaction for rollback! : " + transaction);
- if (transaction == null) {
- throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
- "associated witht the local thread but none found! " + transaction);
+ if (ctx.isOriginLocal()) {
+ Transaction transaction = txManager.getTransaction();
+ if (trace)
+ log.trace("Marking the transaction for rollback! : " + transaction);
+ if (transaction == null) {
+ throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
+ "associated witht the local thread but none found! " + transaction);
+ }
+ transaction.setRollbackOnly();
+ txTable.removeLocalTransaction(transaction);
+ throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction +
+ "; it was marked for rollback");
+ } else {
+ DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
+ gtx.setMarkedForRollback(true);
+ throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx +
+ "; it was marked for rollback");
}
- transaction.setRollbackOnly();
- throw new DeadlockDetectedException("Deadlock request was detected, tx " + transaction +
- " was marked for rollback");
} else {
if (trace)
- log.trace("Received an interrupt request, but we're not running within deadlock detection scenario, so passing it up the stack", ie);
+ log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack", ie);
throw ie;
}
}
Modified: trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -332,4 +332,12 @@
public void setStatus(int stat) {
this.status = stat;
}
+
+ @Override
+ public String toString() {
+ return "DummyTransaction{" +
+ "xid=" + xid +
+ ", status=" + status +
+ '}';
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -79,6 +79,10 @@
return remote;
}
+ public void setRemote(boolean remote) {
+ this.remote = remote;
+ }
+
@Override
public int hashCode() {
if (hash_code == -1) {
Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -24,12 +24,16 @@
private static final Log log = LogFactory.getLog(DeadlockDetectingLockManager.class);
- private volatile long spinDuration;
+ protected volatile long spinDuration;
- private volatile boolean exposeJmxStats;
+ protected volatile boolean exposeJmxStats;
- private AtomicLong detectedDeadlocks = new AtomicLong(0);
+ private volatile boolean isSync;
+ private AtomicLong detectedRemoteDeadlocks = new AtomicLong(0);
+
+ private AtomicLong detectedLocalDeadlocks = new AtomicLong(0);
+
private AtomicLong locallyInterruptedTransactions = new AtomicLong(0);
private AtomicLong overlapWithNotDeadlockAwareLockOwners = new AtomicLong(0);
@@ -38,13 +42,13 @@
public void init() {
spinDuration = configuration.getDeadlockDetectionSpinDuration();
exposeJmxStats = configuration.isExposeJmxStatistics();
+ isSync = configuration.getCacheMode().isSynchronous();
}
public boolean lockAndRecord(Object key, InvocationContext ctx) throws InterruptedException {
long lockTimeout = getLockAcquisitionTimeout(ctx);
if (trace) log.trace("Attempting to lock {0} with acquisition timeout of {1} millis", key, lockTimeout);
-
if (ctx.isInTxScope()) {
if (trace) log.trace("Using early dead lock detection");
final long start = System.currentTimeMillis();
@@ -64,10 +68,10 @@
continue; //try to acquire lock again, for the rest of the time
}
DeadlockDetectingGlobalTransaction lockOwnerTx = (DeadlockDetectingGlobalTransaction) owner;
- if (!ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
+ if (isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
return remoteVsRemoteDld(key, ctx, lockTimeout, start, now, lockOwnerTx);
}
- if (ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
+ if ((ctx.isOriginLocal() && !lockOwnerTx.isRemote()) || (!isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote())) {
localVsLocalDld(ctx, lockOwnerTx);
}
}
@@ -94,7 +98,7 @@
log.trace("deadlock situation detected. Shall I interrupt?" + iShouldInterrupt );
if (iShouldInterrupt) {
lockOwnerTx.interruptProcessingThread();
- if (exposeJmxStats) detectedDeadlocks.incrementAndGet();
+ if (exposeJmxStats) detectedLocalDeadlocks.incrementAndGet();
}
}
}
@@ -109,7 +113,7 @@
if (thisShouldInterrupt && isDeadLock) {
lockOwnerTx.interruptProcessingThread();
if (exposeJmxStats) {
- detectedDeadlocks.incrementAndGet();
+ detectedRemoteDeadlocks.incrementAndGet();
locallyInterruptedTransactions.incrementAndGet();
}
return lockForTheRemainingTime(key, lockTimeout, start, now);
@@ -119,7 +123,7 @@
if (trace)
log.trace("Not trying to acquire lock anymore, as we're in deadlock and this will be rollback at origin");
if (exposeJmxStats) {
- detectedDeadlocks.incrementAndGet();
+ detectedRemoteDeadlocks.incrementAndGet();
}
remoteGlobalTransaction.setMarkedForRollback(true);
throw new DeadlockDetectedException("Deadlock situation detected on tx: " + remoteTxContext.getLockOwner());
@@ -149,16 +153,27 @@
return locallyInterruptedTransactions.get();
}
- @ManagedAttribute(description = "Total number of deadlocks detected")
- public long getDetectedDeadlocks() {
- return detectedDeadlocks.get();
+ @ManagedAttribute(description = "Number of remote deadlocks detected")
+ public long getDetectedRemoteDeadlocks() {
+ return detectedRemoteDeadlocks.get();
}
+ @ManagedAttribute (description = "Number of local detected deadlocks")
+ public long getDetectedLocalDeadlocks() {
+ return detectedLocalDeadlocks.get();
+ }
+
+ @ManagedAttribute (description = "Total number of local detected deadlocks")
+ public long getTotalNumberOfDetectedDeadlocks() {
+ return detectedRemoteDeadlocks.get() + detectedLocalDeadlocks.get();
+ }
+
@ManagedOperation(description = "Resets statistics gathered by this component")
public void resetStatistics() {
overlapWithNotDeadlockAwareLockOwners.set(0);
locallyInterruptedTransactions.set(0);
- detectedDeadlocks.set(0);
+ detectedRemoteDeadlocks.set(0);
+ detectedLocalDeadlocks.set(0);
}
}
Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -54,7 +54,7 @@
@MBean(objectName = "LockManager", description = "Manager that handles MVCC locks for entries")
public class LockManagerImpl implements LockManager {
protected Configuration configuration;
- LockContainer lockContainer;
+ protected LockContainer lockContainer;
private TransactionManager transactionManager;
private InvocationContextContainer invocationContextContainer;
private static final Log log = LogFactory.getLog(LockManagerImpl.class);
Modified: trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -24,10 +24,6 @@
//following methods are overridden as TestNG will otherwise run them even if I mark the class as enabled = false
- @Override
- public void testDeadlockDetectionAndAsyncCaches() {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
- }
@Override
public void testExpectedInnerStructure() {
Modified: trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -49,6 +49,8 @@
public static final long BENCHMARK_DURATION = 60000;
+ public static boolean NO_COLISION = false;
+
public static boolean USE_DLD = true;
public static List<String> keyPool;
@@ -75,6 +77,21 @@
}
}
+ @Test(invocationCount = 5, enabled = false)
+ public void testLocalDifferentTxSizeNoCollision() throws Exception {
+ NO_COLISION = true;
+ USE_DLD = false;
+ for (int i = 2; i < KEY_POOL_SIZE; i++) {
+ TX_SIZE = i;
+ runLocalTest();
+ }
+ USE_DLD = true;
+ for (int i = 2; i < KEY_POOL_SIZE; i++) {
+ TX_SIZE = i;
+ runLocalTest();
+ }
+ }
+
@Test(invocationCount = 10, enabled = false)
public void testReplDifferentTxSize() throws Exception {
THREAD_COUNT = 2;
@@ -215,7 +232,9 @@
result.add(key);
}
ArrayList resultList = new ArrayList(result);
- Collections.shuffle(resultList);
+ if (!NO_COLISION) {
+ Collections.shuffle(resultList);
+ }
return resultList;
}
}
Modified: trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -14,7 +14,7 @@
*
* @author Mircea.Markus at jboss.com
* @see Operations
- * @see PerCacheExecutorThread.OperationsResult
+ * @see OperationsResult
*/
public final class PerCacheExecutorThread extends Thread {
@@ -69,7 +69,7 @@
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- System.out.println("about to process operation " + operation);
+ log.trace("about to process operation " + operation);
switch (operation) {
case BEGGIN_TX: {
TransactionManager txManager = TestingUtil.getTransactionManager(cache);
@@ -96,7 +96,7 @@
case PUT_KEY_VALUE: {
try {
cache.put(key, value);
- log.trace("Successfully exucuted putKeyValue(" + key + ", " + value + ")");
+ log.trace("Successfully executed putKeyValue(" + key + ", " + value + ")");
setResponse(OperationsResult.PUT_KEY_VALUE_OK);
} catch (Exception e) {
log.trace("Exception while executing putKeyValue(" + key + ", " + value + ")", e);
@@ -107,7 +107,7 @@
case REMOVE_KEY: {
try {
cache.remove(key);
- log.trace("Successfully exucuted remove(" + key + ")");
+ log.trace("Successfully executed remove(" + key + ")");
setResponse(OperationsResult.REMOVE_KEY_OK);
} catch (Exception e) {
log.trace("Exception while executing remove(" + key + ")", e);
@@ -118,7 +118,7 @@
case REPLACE_KEY_VALUE: {
try {
cache.replace(key, value);
- log.trace("Successfully exucuted replace(" + key + "," + value + ")");
+ log.trace("Successfully executed replace(" + key + "," + value + ")");
setResponse(OperationsResult.REPLACE_KEY_VALUE_OK);
} catch (Exception e) {
log.trace("Exception while executing replace(" + key + "," + value + ")", e);
@@ -127,7 +127,7 @@
break;
}
case STOP_THREAD: {
- System.out.println("Exiting...");
+ log.trace("Exiting...");
toExecute = null;
run = false;
break;
@@ -138,7 +138,6 @@
}
if (responseLatch != null) responseLatch.countDown();
}
- setResponse("EXIT");
}
private void setResponse(Object e) {
Modified: trunk/core/src/test/java/org/infinispan/test/ReplListener.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/ReplListener.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/test/ReplListener.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -161,10 +161,12 @@
public void waitForRpc(long time, TimeUnit unit) {
assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
try {
- info("Expect Any is " + expectAny + ", saw at least one? " + sawAtLeastOneInvocation + " Expected " + expectedCommands);
boolean successful = (expectAny && sawAtLeastOneInvocation) || (!expectAny && expectedCommands.isEmpty());
+ info("Expect Any is " + expectAny + ", saw at least one? " + sawAtLeastOneInvocation + " Successful? " + successful + " Expected commands " + expectedCommands);
if (!successful && !latch.await(time, unit)) {
assert false : "Waiting for more than " + time + " " + unit + " and following commands did not replicate: " + expectedCommands + " on cache [" + c.getCacheManager().getAddress() + "]";
+ } else {
+ info("Exiting wait for rpc with expected commands " + expectedCommands);
}
}
catch (InterruptedException e) {
Added: trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -0,0 +1,180 @@
+package org.infinispan.tx;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.PerCacheExecutorThread;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.locks.DeadlockDetectedException;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.infinispan.util.concurrent.locks.LockManager;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests deadlock detection for async caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Test(groups = "functional", testName = "tx.AsyncDeadlockDetectionTest")
+public class AsyncDeadlockDetectionTest extends MultipleCacheManagersTest {
+ private Cache cache0;
+ private Cache cache1;
+ private DeadlockDetectingLockManager ddLm0;
+ private DeadlockDetectingLockManager ddLm1;
+ private PerCacheExecutorThread t0;
+ private PerCacheExecutorThread t1;
+ private RemoteReplicationInterceptor remoteReplicationInterceptor;
+
+
+ protected void createCacheManagers() throws Throwable {
+ System.out.println("AsyncDeadlockDetectionTest.createCacheManagers");
+ Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC);
+ config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ config.setEnableDeadlockDetection(true);
+ config.setSyncCommitPhase(true);
+ config.setSyncRollbackPhase(true);
+ config.setUseLockStriping(false);
+ assert config.isEnableDeadlockDetection();
+ createClusteredCaches(2, "test", config);
+ assert config.isEnableDeadlockDetection();
+
+ cache0 = cache(0, "test");
+ cache1 = cache(1, "test");
+ remoteReplicationInterceptor = new RemoteReplicationInterceptor();
+ cache1.getAdvancedCache().addInterceptor(remoteReplicationInterceptor, 0);
+ assert cache0.getConfiguration().isEnableDeadlockDetection();
+ assert cache1.getConfiguration().isEnableDeadlockDetection();
+ assert !cache0.getConfiguration().isExposeJmxStatistics();
+ assert !cache1.getConfiguration().isExposeJmxStatistics();
+
+ ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0)).setExposeJmxStats(true);
+ ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1)).setExposeJmxStats(true);
+
+ ddLm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0);
+ ddLm1 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1);
+ }
+
+ @BeforeMethod
+ public void beforeMethod() {
+ t0 = new PerCacheExecutorThread(cache0, 0);
+ t1 = new PerCacheExecutorThread(cache1, 1);
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ t0.stopThread();
+ t1.stopThread();
+ ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0)).resetStatistics();
+ ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1)).resetStatistics();
+ remoteReplicationInterceptor.executionResponse = null;
+ }
+
+ public void testRemoteTxVsLocal() throws Exception {
+ assertEquals(PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK, t0.execute(PerCacheExecutorThread.Operations.BEGGIN_TX));
+ t0.setKeyValue("k1", "v1_t0");
+ assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t0.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+ t0.setKeyValue("k2", "v2_t0");
+ assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t0.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+
+ assertEquals(PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK, t1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX));
+ t1.setKeyValue("k2", "v2_t1");
+ assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+
+ t0.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+
+ LockManager lockManager = TestingUtil.extractLockManager(cache1);
+ while (!lockManager.isLocked("k1")) {
+ Thread.sleep(50);
+ }
+ System.out.println("successful replication !");
+
+
+ t1.setKeyValue("k1", "v1_t1");
+ t1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+
+
+ Object t1Response = t1.waitForResponse();
+ Object t0Response = remoteReplicationInterceptor.getResponse();
+
+ System.out.println("t0Response = " + t0Response);
+ System.out.println("t1Response = " + t1Response);
+
+ assert xor(t1Response instanceof DeadlockDetectedException, t0Response instanceof DeadlockDetectedException);
+
+ if (t0Response instanceof DeadlockDetectedException) {
+ replListener(cache0).expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+ assertEquals(t1.execute(PerCacheExecutorThread.Operations.COMMIT_TX), PerCacheExecutorThread.OperationsResult.COMMIT_TX_OK);
+ replListener(cache0).waitForRpc();
+ }
+
+ assertFalse(ddLm0.isLocked("k1"));
+ assertFalse(ddLm1.isLocked("k1"));
+ assertFalse(ddLm0.isLocked("k2"));
+ assertFalse(ddLm1.isLocked("k2"));
+ TransactionTable transactionTable0 = TestingUtil.extractComponent(cache0, TransactionTable.class);
+ assertEquals(transactionTable0.getLocalTxCount(), 0);
+ for (int i = 0; i < 20; i++) {
+ if (! (transactionTable0.getRemoteTxCount() == 0)) Thread.sleep(50);
+ }
+
+ assertEquals(transactionTable0.getRemoteTxCount(), 0);
+
+ TransactionTable transactionTable1 = TestingUtil.extractComponent(cache1, TransactionTable.class);
+ assertEquals(transactionTable1.getLocalTxCount(), 0);
+ for (int i = 0; i < 20; i++) {
+ if (! (transactionTable1.getRemoteTxCount() == 0)) Thread.sleep(50);
+ }
+ assertEquals(transactionTable1.getRemoteTxCount(), 0);
+
+ if (t1Response instanceof DeadlockDetectedException) {
+ assertEquals(cache0.get("k1"), "v1_t0");
+ assertEquals(cache0.get("k2"), "v2_t0");
+ assertEquals(cache1.get("k1"),"v1_t0");
+ assertEquals(cache1.get("k2"),"v2_t0");
+ } else {
+ assertEquals(cache0.get("k1"), "v1_t1");
+ assertEquals(cache0.get("k2"), "v2_t1");
+ assertEquals(cache1.get("k1"),"v1_t1");
+ assertEquals(cache1.get("k2"),"v2_t1");
+ }
+ }
+
+
+ public static class RemoteReplicationInterceptor extends CommandInterceptor {
+
+ public volatile Object executionResponse;
+
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+ try {
+ return invokeNextInterceptor(ctx, command);
+ } catch (Throwable throwable) {
+ if (!ctx.isOriginLocal()) {
+ log.trace("Setting thrownExceptionForRemoteTx to " + throwable);
+ executionResponse = throwable;
+ } else {
+ log.trace("Ignoring throwable " + throwable);
+ executionResponse = "NONE";
+ }
+ throw throwable;
+ }
+ }
+
+ public Object getResponse() throws Exception {
+ while (executionResponse == null) {
+ Thread.sleep(50);
+ }
+ return executionResponse;
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -187,7 +187,7 @@
assert t2.execute(PerCacheExecutorThread.Operations.COMMIT_TX) instanceof RollbackException;
}
assert lockManager.getNumberOfLocksHeld() == 0;
- assertEquals(lockManager.getDetectedDeadlocks(), 1);
+ assertEquals(lockManager.getDetectedLocalDeadlocks(), 1);
}
}
Modified: trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -3,11 +3,9 @@
import org.infinispan.api.mvcc.LockAssert;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.Configuration;
-import org.infinispan.config.ConfigurationException;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.interceptors.DeadlockDetectingInterceptor;
import org.infinispan.interceptors.InterceptorChain;
-import org.infinispan.manager.CacheManager;
import org.infinispan.remoting.ReplicationException;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseFilter;
@@ -17,9 +15,8 @@
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
import org.infinispan.test.PerCacheExecutorThread;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.test.TestingUtil;
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
@@ -103,22 +100,6 @@
((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(1, "test"))).resetStatistics();
}
- public void testDeadlockDetectionAndAsyncCaches() {
- Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC);
- config.setEnableDeadlockDetection(true);
- config.setUseLockStriping(false);
- CacheManager cm = TestCacheManagerFactory.createClusteredCacheManager();
- cm.defineCache("test", config);
- try {
- cm.getCache("test");
- assert false : "Exception expected";
- } catch (ConfigurationException e) {
- //expected
- System.out.println("Error message is " + e.getMessage());
- }
- cm.stop();
- }
-
public void testExpectedInnerStructure() {
LockManager lockManager = TestingUtil.extractComponent(cache(0, "test"), LockManager.class);
assert lockManager instanceof DeadlockDetectingLockManager;
@@ -171,7 +152,7 @@
assert o.equals("value1");
}
- assert ddLm1.getDetectedDeadlocks() + ddLm2.getDetectedDeadlocks() >= 1;
+ assert ddLm1.getDetectedRemoteDeadlocks() + ddLm2.getDetectedRemoteDeadlocks() >= 1;
LockManager lm1 = TestingUtil.extractComponent(cache(0, "test"), LockManager.class);
assert !lm1.isLocked("key") : "It is locked by " + lm1.getOwner("key");
Added: trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java 2009-07-23 13:28:56 UTC (rev 612)
@@ -0,0 +1,123 @@
+package org.infinispan.util;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.classextension.EasyMock.replay;
+import static org.easymock.classextension.EasyMock.verify;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.NonTxInvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
+import org.infinispan.transaction.xa.GlobalTransactionFactory;
+import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.infinispan.util.concurrent.locks.containers.LockContainer;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests functionality in {@link org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager}.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Test(groups = "unit", testName = "util.DeadlockDetectingLockManagerTest")
+public class DeadlockDetectingLockManagerTest {
+
+ DeadlockDetectingLockManagerMock lockManager;
+ GlobalTransactionFactory gtf = new GlobalTransactionFactory(true);
+ Configuration config = new Configuration();
+ private LockContainer lc;
+ private static final int SPIN_DURATION = 1000;
+ private DeadlockDetectingGlobalTransaction lockOwner;
+
+ @BeforeMethod
+ public void setUp() {
+ lc = createMock(LockContainer.class);
+ lockManager = new DeadlockDetectingLockManagerMock(SPIN_DURATION, true, lc, config);
+ lockOwner = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+ }
+
+
+ public void testNoTransaction() throws Exception {
+ InvocationContext nonTx = new NonTxInvocationContext();
+
+ expect(lc.acquireLock("k",config.getLockAcquisitionTimeout(), TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+ expect(lc.acquireLock("k",config.getLockAcquisitionTimeout(), TimeUnit.MILLISECONDS)).andReturn(false);
+ replay(lc);
+ assert lockManager.lockAndRecord("k",nonTx);
+ assert !lockManager.lockAndRecord("k",nonTx);
+ verify();
+ }
+
+ public void testLockHeldByThread() throws Exception {
+ InvocationContext localTxContext = new LocalTxInvocationContext();
+
+ //this makes sure that we cannot acquire lock from the first try
+ expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.FALSE);
+ lockManager.setOwner(Thread.currentThread() );
+ //next lock acquisition will succeed
+ expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+ replay(lc);
+
+ assert lockManager.lockAndRecord("k", localTxContext);
+ assert lockManager.getOverlapWithNotDeadlockAwareLockOwners() == 1;
+ }
+
+ public void testLocalDeadlock() throws Exception {
+ final DeadlockDetectingGlobalTransaction ddgt = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+
+ InvocationContext localTxContext = new LocalTxInvocationContext() {
+ @Override
+ public Object getLockOwner() {
+ return ddgt;
+ }
+ };
+
+ ddgt.setCoinToss(0);
+ lockOwner.setCoinToss(-1);
+ assert ddgt.thisWillInterrupt(lockOwner);
+
+ //this makes sure that we cannot acquire lock from the first try
+ expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.FALSE);
+ expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+ lockOwner.setRemote(false);
+ lockManager.setOwner(lockOwner);
+ lockManager.setOwnsLock(true);
+ replay(lc);
+ assert lockManager.lockAndRecord("k", localTxContext);
+ assert lockManager.getDetectedLocalDeadlocks() == 1;
+ }
+
+ public static class DeadlockDetectingLockManagerMock extends DeadlockDetectingLockManager {
+
+ private Object owner;
+ private boolean ownsLock;
+
+ public DeadlockDetectingLockManagerMock(long spinDuration, boolean exposeJmxStats, LockContainer lockContainer, Configuration configuration) {
+ this.spinDuration = spinDuration;
+ this.exposeJmxStats = exposeJmxStats;
+ super.lockContainer = lockContainer;
+ this.configuration = configuration;
+ }
+
+ public void setOwner(Object owner) {
+ this.owner = owner;
+ }
+
+ public void setOwnsLock(boolean ownsLock) {
+ this.ownsLock = ownsLock;
+ }
+
+ @Override
+ public Object getOwner(Object key) {
+ return owner;
+ }
+
+ @Override
+ public boolean ownsLock(Object key, Object owner) {
+ return ownsLock;
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list