[infinispan-commits] Infinispan SVN: r612 - in trunk/core/src: main/java/org/infinispan/interceptors and 8 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jul 23 09:28:57 EDT 2009


Author: mircea.markus
Date: 2009-07-23 09:28:56 -0400 (Thu, 23 Jul 2009)
New Revision: 612

Added:
   trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
   trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
   trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
   trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
   trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
   trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java
   trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
   trunk/core/src/test/java/org/infinispan/test/ReplListener.java
   trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
   trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
Log:
[ISPN-38] (Deadlock detection) - support DLD for async caches

Modified: trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/factories/LockManagerFactory.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -22,10 +22,9 @@
 package org.infinispan.factories;
 
 import org.infinispan.factories.annotations.DefaultFactoryFor;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
 import org.infinispan.util.concurrent.locks.LockManager;
 import org.infinispan.util.concurrent.locks.LockManagerImpl;
-import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
-import org.infinispan.config.ConfigurationException;
 
 /**
  * // TODO: MANIK: Document this
@@ -37,9 +36,6 @@
 public class LockManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    public <T> T construct(Class<T> componentType) {
       if (configuration.isEnableDeadlockDetection()) {
-         if (!configuration.getCacheMode().isSynchronous()) {
-            throw new ConfigurationException("Eager Dead lock detection can only be used for sync caches!");
-         }
          return (T) new DeadlockDetectingLockManager();
       } else {
          return (T) new LockManagerImpl();

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -26,7 +26,7 @@
  * appropriate information needed in order to accomplish deadlock detection. It MUST process populate data before the
  * replication takes place, so it will do all the tasks before calling {@link org.infinispan.interceptors.base.CommandInterceptor#invokeNextInterceptor(org.infinispan.context.InvocationContext,
  * org.infinispan.commands.VisitableCommand)}.
- *
+ * <p/>
  * Note: for local caches, deadlock detection dos NOT work for aggregate operations (clear, putAll).
  *
  * @author Mircea.Markus at jboss.com
@@ -65,21 +65,29 @@
       try {
          return invokeNextInterceptor(ctx, command);
       } catch (InterruptedException ie) {
-         if (ctx.isOriginLocal() && ctx.isInTxScope()) {
+         if (ctx.isInTxScope()) {
             lockManager.releaseLocks(ctx);
-            Transaction transaction = txManager.getTransaction();
-            if (trace)
-               log.trace("Marking the transaction for rollback! : " + transaction);
-            if (transaction == null) {
-               throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
-                     "associated witht the local thread but none found! " + transaction);
+            if (ctx.isOriginLocal()) {
+               Transaction transaction = txManager.getTransaction();
+               if (trace)
+                  log.trace("Marking the transaction for rollback! : " + transaction);
+               if (transaction == null) {
+                  throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
+                        "associated witht the local thread but none found! " + transaction);
+               }
+               transaction.setRollbackOnly();
+               txTable.removeLocalTransaction(transaction);
+               throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction +
+                     "; it was marked for rollback");
+            } else {
+               DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
+               gtx.setMarkedForRollback(true);
+               throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx +
+                     "; it was marked for rollback");
             }
-            transaction.setRollbackOnly();
-            throw new DeadlockDetectedException("Deadlock request was detected, tx " + transaction +
-                  " was marked for rollback");
          } else {
             if (trace)
-               log.trace("Received an interrupt request, but we're not running within deadlock detection scenario, so passing it up the stack", ie);
+               log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack", ie);
             throw ie;
          }
       }

Modified: trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -332,4 +332,12 @@
    public void setStatus(int stat) {
       this.status = stat;
    }
+
+   @Override
+   public String toString() {
+      return "DummyTransaction{" +
+            "xid=" + xid +
+            ", status=" + status +
+            '}';
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -79,6 +79,10 @@
       return remote;
    }
 
+   public void setRemote(boolean remote) {
+      this.remote = remote;
+   }
+
    @Override
    public int hashCode() {
       if (hash_code == -1) {

Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -24,12 +24,16 @@
 
    private static final Log log = LogFactory.getLog(DeadlockDetectingLockManager.class);
 
-   private volatile long spinDuration;
+   protected volatile long spinDuration;
 
-   private volatile boolean exposeJmxStats;
+   protected volatile boolean exposeJmxStats;
 
-   private AtomicLong detectedDeadlocks = new AtomicLong(0);
+   private volatile boolean isSync;
 
+   private AtomicLong detectedRemoteDeadlocks = new AtomicLong(0);
+
+   private AtomicLong detectedLocalDeadlocks = new AtomicLong(0);
+
    private AtomicLong locallyInterruptedTransactions = new AtomicLong(0);
 
    private AtomicLong overlapWithNotDeadlockAwareLockOwners = new AtomicLong(0);
@@ -38,13 +42,13 @@
    public void init() {
       spinDuration = configuration.getDeadlockDetectionSpinDuration();
       exposeJmxStats = configuration.isExposeJmxStatistics();
+      isSync = configuration.getCacheMode().isSynchronous();
    }
 
    public boolean lockAndRecord(Object key, InvocationContext ctx) throws InterruptedException {
       long lockTimeout = getLockAcquisitionTimeout(ctx);
       if (trace) log.trace("Attempting to lock {0} with acquisition timeout of {1} millis", key, lockTimeout);
 
-
       if (ctx.isInTxScope()) {
          if (trace) log.trace("Using early dead lock detection");
          final long start = System.currentTimeMillis();
@@ -64,10 +68,10 @@
                   continue; //try to acquire lock again, for the rest of the time
                }
                DeadlockDetectingGlobalTransaction lockOwnerTx = (DeadlockDetectingGlobalTransaction) owner;
-               if (!ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
+               if (isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
                   return remoteVsRemoteDld(key, ctx, lockTimeout, start, now, lockOwnerTx);
                }
-               if (ctx.isOriginLocal() && !lockOwnerTx.isRemote()) {
+               if ((ctx.isOriginLocal() && !lockOwnerTx.isRemote()) || (!isSync && !ctx.isOriginLocal() && !lockOwnerTx.isRemote())) {
                   localVsLocalDld(ctx, lockOwnerTx);
                }
             }
@@ -94,7 +98,7 @@
             log.trace("deadlock situation detected. Shall I interrupt?" + iShouldInterrupt );
          if (iShouldInterrupt) {
             lockOwnerTx.interruptProcessingThread();
-            if (exposeJmxStats) detectedDeadlocks.incrementAndGet();
+            if (exposeJmxStats) detectedLocalDeadlocks.incrementAndGet();
          }
       }
    }
@@ -109,7 +113,7 @@
       if (thisShouldInterrupt && isDeadLock) {
          lockOwnerTx.interruptProcessingThread();
          if (exposeJmxStats) {
-            detectedDeadlocks.incrementAndGet();
+            detectedRemoteDeadlocks.incrementAndGet();
             locallyInterruptedTransactions.incrementAndGet();
          }
          return lockForTheRemainingTime(key, lockTimeout, start, now);
@@ -119,7 +123,7 @@
          if (trace)
             log.trace("Not trying to acquire lock anymore, as we're in deadlock and this will be rollback at origin");
          if (exposeJmxStats) {
-            detectedDeadlocks.incrementAndGet();
+            detectedRemoteDeadlocks.incrementAndGet();
          }
          remoteGlobalTransaction.setMarkedForRollback(true);
          throw new DeadlockDetectedException("Deadlock situation detected on tx: " + remoteTxContext.getLockOwner());
@@ -149,16 +153,27 @@
       return locallyInterruptedTransactions.get();
    }
 
-   @ManagedAttribute(description = "Total number of deadlocks detected")
-   public long getDetectedDeadlocks() {
-      return detectedDeadlocks.get();
+   @ManagedAttribute(description = "Number of remote deadlocks detected")
+   public long getDetectedRemoteDeadlocks() {
+      return detectedRemoteDeadlocks.get();
    }
 
+   @ManagedAttribute (description = "Number of local detected deadlocks")
+   public long getDetectedLocalDeadlocks() {
+      return detectedLocalDeadlocks.get();
+   }
+
+   @ManagedAttribute (description = "Total number of local detected deadlocks")
+   public long getTotalNumberOfDetectedDeadlocks() {
+      return detectedRemoteDeadlocks.get() + detectedLocalDeadlocks.get();
+   }
+
    @ManagedOperation(description = "Resets statistics gathered by this component")
    public void resetStatistics() {
       overlapWithNotDeadlockAwareLockOwners.set(0);
       locallyInterruptedTransactions.set(0);
-      detectedDeadlocks.set(0);
+      detectedRemoteDeadlocks.set(0);
+      detectedLocalDeadlocks.set(0);
    }
 
 }

Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -54,7 +54,7 @@
 @MBean(objectName = "LockManager", description = "Manager that handles MVCC locks for entries")
 public class LockManagerImpl implements LockManager {
    protected Configuration configuration;
-   LockContainer lockContainer;
+   protected LockContainer lockContainer;
    private TransactionManager transactionManager;
    private InvocationContextContainer invocationContextContainer;
    private static final Log log = LogFactory.getLog(LockManagerImpl.class);

Modified: trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/distribution/DeadlockDetectionDistributionTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -24,10 +24,6 @@
 
 
    //following methods are overridden as TestNG will otherwise run them even if I mark the class as enabled = false
-   @Override
-   public void testDeadlockDetectionAndAsyncCaches() {
-      throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
-   }
 
    @Override
    public void testExpectedInnerStructure() {

Modified: trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/profiling/DeadlockDetectionPerformanceTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -49,6 +49,8 @@
 
    public static final long BENCHMARK_DURATION = 60000;
 
+   public static boolean NO_COLISION = false;
+
    public static boolean USE_DLD = true;
 
    public static List<String> keyPool;
@@ -75,6 +77,21 @@
       }
    }
 
+   @Test(invocationCount = 5, enabled = false)
+   public void testLocalDifferentTxSizeNoCollision() throws Exception {
+      NO_COLISION = true;
+      USE_DLD = false;
+      for (int i = 2; i < KEY_POOL_SIZE; i++) {
+         TX_SIZE = i;
+         runLocalTest();
+      }
+      USE_DLD = true;
+      for (int i = 2; i < KEY_POOL_SIZE; i++) {
+         TX_SIZE = i;
+         runLocalTest();
+      }
+   }
+
    @Test(invocationCount = 10, enabled = false)
    public void testReplDifferentTxSize() throws Exception {
       THREAD_COUNT = 2;
@@ -215,7 +232,9 @@
          result.add(key);
       }
       ArrayList resultList = new ArrayList(result);
-      Collections.shuffle(resultList);
+      if (!NO_COLISION) {
+         Collections.shuffle(resultList);
+      }
       return resultList;
    }
 }

Modified: trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -14,7 +14,7 @@
  *
  * @author Mircea.Markus at jboss.com
  * @see Operations
- * @see PerCacheExecutorThread.OperationsResult
+ * @see OperationsResult
  */
 public final class PerCacheExecutorThread extends Thread {
 
@@ -69,7 +69,7 @@
          } catch (InterruptedException e) {
             throw new RuntimeException(e);
          }
-         System.out.println("about to process operation " + operation);
+         log.trace("about to process operation " + operation);
          switch (operation) {
             case BEGGIN_TX: {
                TransactionManager txManager = TestingUtil.getTransactionManager(cache);
@@ -96,7 +96,7 @@
             case PUT_KEY_VALUE: {
                try {
                   cache.put(key, value);
-                  log.trace("Successfully exucuted putKeyValue(" + key + ", " + value + ")");
+                  log.trace("Successfully executed putKeyValue(" + key + ", " + value + ")");
                   setResponse(OperationsResult.PUT_KEY_VALUE_OK);
                } catch (Exception e) {
                   log.trace("Exception while executing putKeyValue(" + key + ", " + value + ")", e);
@@ -107,7 +107,7 @@
             case REMOVE_KEY: {
                try {
                   cache.remove(key);
-                  log.trace("Successfully exucuted remove(" + key + ")");
+                  log.trace("Successfully executed remove(" + key + ")");
                   setResponse(OperationsResult.REMOVE_KEY_OK);
                } catch (Exception e) {
                   log.trace("Exception while executing remove(" + key + ")", e);
@@ -118,7 +118,7 @@
             case REPLACE_KEY_VALUE: {
                try {
                   cache.replace(key, value);
-                  log.trace("Successfully exucuted replace(" + key + "," + value + ")");
+                  log.trace("Successfully executed replace(" + key + "," + value + ")");
                   setResponse(OperationsResult.REPLACE_KEY_VALUE_OK);
                } catch (Exception e) {
                   log.trace("Exception while executing replace(" + key + "," + value + ")", e);
@@ -127,7 +127,7 @@
                break;
             }
             case STOP_THREAD: {
-               System.out.println("Exiting...");
+               log.trace("Exiting...");
                toExecute = null;
                run = false;
                break;
@@ -138,7 +138,6 @@
          }
          if (responseLatch != null) responseLatch.countDown();
       }
-      setResponse("EXIT");
    }
 
    private void setResponse(Object e) {

Modified: trunk/core/src/test/java/org/infinispan/test/ReplListener.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/ReplListener.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/test/ReplListener.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -161,10 +161,12 @@
    public void waitForRpc(long time, TimeUnit unit) {
       assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
       try {
-         info("Expect Any is " + expectAny + ", saw at least one? " + sawAtLeastOneInvocation + " Expected " + expectedCommands);
          boolean successful = (expectAny && sawAtLeastOneInvocation) || (!expectAny && expectedCommands.isEmpty());
+         info("Expect Any is " + expectAny + ", saw at least one? " + sawAtLeastOneInvocation + " Successful? " + successful + " Expected commands " + expectedCommands);
          if (!successful && !latch.await(time, unit)) {
             assert false : "Waiting for more than " + time + " " + unit + " and following commands did not replicate: " + expectedCommands + " on cache [" + c.getCacheManager().getAddress() + "]";
+         } else {
+            info("Exiting wait for rpc with expected commands " + expectedCommands);
          }
       }
       catch (InterruptedException e) {

Added: trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -0,0 +1,180 @@
+package org.infinispan.tx;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.PerCacheExecutorThread;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.locks.DeadlockDetectedException;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.infinispan.util.concurrent.locks.LockManager;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests deadlock detection for async caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Test(groups = "functional", testName = "tx.AsyncDeadlockDetectionTest")
+public class AsyncDeadlockDetectionTest extends MultipleCacheManagersTest {
+   private Cache cache0;
+   private Cache cache1;
+   private DeadlockDetectingLockManager ddLm0;
+   private DeadlockDetectingLockManager ddLm1;
+   private PerCacheExecutorThread t0;
+   private PerCacheExecutorThread t1;
+   private RemoteReplicationInterceptor remoteReplicationInterceptor;
+
+
+   protected void createCacheManagers() throws Throwable {
+      System.out.println("AsyncDeadlockDetectionTest.createCacheManagers");
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC);
+      config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      config.setEnableDeadlockDetection(true);
+      config.setSyncCommitPhase(true);
+      config.setSyncRollbackPhase(true);
+      config.setUseLockStriping(false);
+      assert config.isEnableDeadlockDetection();
+      createClusteredCaches(2, "test", config);
+      assert config.isEnableDeadlockDetection();
+
+      cache0 = cache(0, "test");
+      cache1 = cache(1, "test");
+      remoteReplicationInterceptor = new RemoteReplicationInterceptor();
+      cache1.getAdvancedCache().addInterceptor(remoteReplicationInterceptor, 0);
+      assert cache0.getConfiguration().isEnableDeadlockDetection();
+      assert cache1.getConfiguration().isEnableDeadlockDetection();
+      assert !cache0.getConfiguration().isExposeJmxStatistics();
+      assert !cache1.getConfiguration().isExposeJmxStatistics();
+
+      ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0)).setExposeJmxStats(true);
+      ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1)).setExposeJmxStats(true);
+
+      ddLm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0);
+      ddLm1 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1);
+   }
+
+   @BeforeMethod
+   public void beforeMethod() {
+      t0 = new PerCacheExecutorThread(cache0, 0);
+      t1 = new PerCacheExecutorThread(cache1, 1);
+   }
+
+   @AfterMethod
+   public void afterMethod() {
+      t0.stopThread();
+      t1.stopThread();
+      ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache0)).resetStatistics();
+      ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache1)).resetStatistics();
+      remoteReplicationInterceptor.executionResponse = null; 
+   }
+
+   public void testRemoteTxVsLocal() throws Exception {
+      assertEquals(PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK, t0.execute(PerCacheExecutorThread.Operations.BEGGIN_TX));
+      t0.setKeyValue("k1", "v1_t0");
+      assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t0.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+      t0.setKeyValue("k2", "v2_t0");
+      assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t0.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+
+      assertEquals(PerCacheExecutorThread.OperationsResult.BEGGIN_TX_OK, t1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX));
+      t1.setKeyValue("k2", "v2_t1");
+      assertEquals(PerCacheExecutorThread.OperationsResult.PUT_KEY_VALUE_OK, t1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE));
+
+      t0.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+
+      LockManager lockManager = TestingUtil.extractLockManager(cache1);
+      while (!lockManager.isLocked("k1")) {
+         Thread.sleep(50);
+      }
+      System.out.println("successful replication !");
+
+
+      t1.setKeyValue("k1", "v1_t1");
+      t1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+
+
+      Object t1Response = t1.waitForResponse();
+      Object t0Response = remoteReplicationInterceptor.getResponse();
+
+      System.out.println("t0Response = " + t0Response);
+      System.out.println("t1Response = " + t1Response);
+
+      assert xor(t1Response instanceof DeadlockDetectedException, t0Response instanceof DeadlockDetectedException);
+
+      if (t0Response instanceof DeadlockDetectedException) {
+         replListener(cache0).expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+         assertEquals(t1.execute(PerCacheExecutorThread.Operations.COMMIT_TX), PerCacheExecutorThread.OperationsResult.COMMIT_TX_OK);
+         replListener(cache0).waitForRpc();
+      }
+
+      assertFalse(ddLm0.isLocked("k1"));
+      assertFalse(ddLm1.isLocked("k1"));
+      assertFalse(ddLm0.isLocked("k2"));
+      assertFalse(ddLm1.isLocked("k2"));
+      TransactionTable transactionTable0 = TestingUtil.extractComponent(cache0, TransactionTable.class);
+      assertEquals(transactionTable0.getLocalTxCount(), 0);
+      for (int i = 0; i < 20; i++) {
+         if (! (transactionTable0.getRemoteTxCount() == 0)) Thread.sleep(50);
+      }
+
+      assertEquals(transactionTable0.getRemoteTxCount(), 0);
+
+      TransactionTable transactionTable1 = TestingUtil.extractComponent(cache1, TransactionTable.class);
+      assertEquals(transactionTable1.getLocalTxCount(), 0);
+      for (int i = 0; i < 20; i++) {
+         if (! (transactionTable1.getRemoteTxCount() == 0)) Thread.sleep(50);
+      }
+      assertEquals(transactionTable1.getRemoteTxCount(), 0);
+
+      if (t1Response instanceof DeadlockDetectedException) {
+         assertEquals(cache0.get("k1"), "v1_t0");
+         assertEquals(cache0.get("k2"), "v2_t0");
+         assertEquals(cache1.get("k1"),"v1_t0");
+         assertEquals(cache1.get("k2"),"v2_t0");
+      } else {
+         assertEquals(cache0.get("k1"), "v1_t1");
+         assertEquals(cache0.get("k2"), "v2_t1");
+         assertEquals(cache1.get("k1"),"v1_t1");
+         assertEquals(cache1.get("k2"),"v2_t1");
+      }
+   }
+
+
+   public static class RemoteReplicationInterceptor extends CommandInterceptor {
+
+      public volatile Object executionResponse;
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, command);
+         } catch (Throwable throwable) {
+            if (!ctx.isOriginLocal()) {
+               log.trace("Setting thrownExceptionForRemoteTx to " + throwable);
+               executionResponse = throwable;
+            } else {
+               log.trace("Ignoring throwable " + throwable);
+               executionResponse = "NONE";
+            }
+            throw throwable;
+         }
+      }
+
+      public Object getResponse() throws Exception {
+         while (executionResponse == null) {
+            Thread.sleep(50);
+         }
+         return executionResponse;
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/tx/AsyncDeadlockDetectionTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/tx/LocalDeadlockDetectionTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -187,7 +187,7 @@
          assert t2.execute(PerCacheExecutorThread.Operations.COMMIT_TX) instanceof RollbackException;
       }
       assert lockManager.getNumberOfLocksHeld() == 0;
-      assertEquals(lockManager.getDetectedDeadlocks(), 1);
+      assertEquals(lockManager.getDetectedLocalDeadlocks(), 1);
    }
 
 }

Modified: trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java	2009-07-23 13:05:47 UTC (rev 611)
+++ trunk/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -3,11 +3,9 @@
 import org.infinispan.api.mvcc.LockAssert;
 import org.infinispan.commands.ReplicableCommand;
 import org.infinispan.config.Configuration;
-import org.infinispan.config.ConfigurationException;
 import org.infinispan.context.impl.NonTxInvocationContext;
 import org.infinispan.interceptors.DeadlockDetectingInterceptor;
 import org.infinispan.interceptors.InterceptorChain;
-import org.infinispan.manager.CacheManager;
 import org.infinispan.remoting.ReplicationException;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.rpc.ResponseFilter;
@@ -17,9 +15,8 @@
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.statetransfer.StateTransferException;
 import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
 import org.infinispan.test.PerCacheExecutorThread;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.test.TestingUtil;
 import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
 import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
@@ -103,22 +100,6 @@
       ((DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(1, "test"))).resetStatistics();
    }
 
-   public void testDeadlockDetectionAndAsyncCaches() {
-      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC);
-      config.setEnableDeadlockDetection(true);
-      config.setUseLockStriping(false);
-      CacheManager cm = TestCacheManagerFactory.createClusteredCacheManager();
-      cm.defineCache("test", config);
-      try {
-         cm.getCache("test");
-         assert false : "Exception expected";
-      } catch (ConfigurationException e) {
-         //expected
-         System.out.println("Error message is " + e.getMessage());
-      }
-      cm.stop();
-   }
-
    public void testExpectedInnerStructure() {
       LockManager lockManager = TestingUtil.extractComponent(cache(0, "test"), LockManager.class);
       assert lockManager instanceof DeadlockDetectingLockManager;
@@ -171,7 +152,7 @@
          assert o.equals("value1");
       }
 
-      assert ddLm1.getDetectedDeadlocks() + ddLm2.getDetectedDeadlocks() >= 1;
+      assert ddLm1.getDetectedRemoteDeadlocks() + ddLm2.getDetectedRemoteDeadlocks() >= 1;
 
       LockManager lm1 = TestingUtil.extractComponent(cache(0, "test"), LockManager.class);
       assert !lm1.isLocked("key") : "It is locked by " + lm1.getOwner("key");

Added: trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java	2009-07-23 13:28:56 UTC (rev 612)
@@ -0,0 +1,123 @@
+package org.infinispan.util;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.classextension.EasyMock.replay;
+import static org.easymock.classextension.EasyMock.verify;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.NonTxInvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
+import org.infinispan.transaction.xa.GlobalTransactionFactory;
+import org.infinispan.transaction.xa.DeadlockDetectingGlobalTransaction;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.infinispan.util.concurrent.locks.containers.LockContainer;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests functionality in {@link org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager}.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Test(groups = "unit", testName = "util.DeadlockDetectingLockManagerTest")
+public class DeadlockDetectingLockManagerTest {
+
+   DeadlockDetectingLockManagerMock lockManager;
+   GlobalTransactionFactory gtf = new GlobalTransactionFactory(true);
+   Configuration config = new Configuration();
+   private LockContainer lc;
+   private static final int SPIN_DURATION = 1000;
+   private DeadlockDetectingGlobalTransaction lockOwner;
+
+   @BeforeMethod
+   public void setUp() {
+      lc = createMock(LockContainer.class);
+      lockManager = new DeadlockDetectingLockManagerMock(SPIN_DURATION, true, lc, config);
+      lockOwner = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+   }
+
+
+   public void testNoTransaction() throws Exception {
+      InvocationContext nonTx = new NonTxInvocationContext();
+      
+      expect(lc.acquireLock("k",config.getLockAcquisitionTimeout(), TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+      expect(lc.acquireLock("k",config.getLockAcquisitionTimeout(), TimeUnit.MILLISECONDS)).andReturn(false);
+      replay(lc);
+      assert lockManager.lockAndRecord("k",nonTx);
+      assert !lockManager.lockAndRecord("k",nonTx);
+      verify();
+   }
+
+   public void testLockHeldByThread() throws Exception {
+      InvocationContext localTxContext = new LocalTxInvocationContext();
+
+      //this makes sure that we cannot acquire lock from the first try
+      expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.FALSE);
+      lockManager.setOwner(Thread.currentThread() );
+      //next lock acquisition will succeed
+      expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+      replay(lc);
+
+      assert lockManager.lockAndRecord("k", localTxContext);
+      assert lockManager.getOverlapWithNotDeadlockAwareLockOwners() == 1;
+   }
+
+   public void testLocalDeadlock() throws Exception {
+      final DeadlockDetectingGlobalTransaction ddgt = (DeadlockDetectingGlobalTransaction) gtf.instantiateGlobalTransaction();
+
+      InvocationContext localTxContext = new LocalTxInvocationContext() {
+         @Override
+         public Object getLockOwner() {
+            return ddgt;
+         }
+      };
+
+      ddgt.setCoinToss(0);
+      lockOwner.setCoinToss(-1);
+      assert ddgt.thisWillInterrupt(lockOwner);
+
+      //this makes sure that we cannot acquire lock from the first try
+      expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.FALSE);
+      expect(lc.acquireLock("k", SPIN_DURATION, TimeUnit.MILLISECONDS)).andReturn(Boolean.TRUE);
+      lockOwner.setRemote(false);
+      lockManager.setOwner(lockOwner);
+      lockManager.setOwnsLock(true);
+      replay(lc);
+      assert lockManager.lockAndRecord("k", localTxContext);
+      assert lockManager.getDetectedLocalDeadlocks() == 1;
+   }
+
+   public static class DeadlockDetectingLockManagerMock extends DeadlockDetectingLockManager {
+
+      private Object owner;
+      private boolean ownsLock;
+
+      public DeadlockDetectingLockManagerMock(long spinDuration, boolean exposeJmxStats, LockContainer lockContainer, Configuration configuration) {
+         this.spinDuration = spinDuration;
+         this.exposeJmxStats = exposeJmxStats;
+         super.lockContainer = lockContainer;
+         this.configuration = configuration;
+      }
+
+      public void setOwner(Object owner) {
+         this.owner = owner;
+      }
+
+      public void setOwnsLock(boolean ownsLock) {
+         this.ownsLock = ownsLock;
+      }
+
+      @Override
+      public Object getOwner(Object key) {
+         return owner;
+      }
+
+      @Override
+      public boolean ownsLock(Object key, Object owner) {
+         return ownsLock;
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/util/DeadlockDetectingLockManagerTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list