[infinispan-commits] Infinispan SVN: r1117 - trunk/core/src/test/java/org/infinispan/api/mvcc/repeatable_read.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Nov 9 05:48:11 EST 2009


Author: galder.zamarreno at jboss.com
Date: 2009-11-09 05:48:11 -0500 (Mon, 09 Nov 2009)
New Revision: 1117

Modified:
   trunk/core/src/test/java/org/infinispan/api/mvcc/repeatable_read/WriteSkewTest.java
Log:
[ISPN-259] (Concurrent puts fail with writeSkew) Added failing unit test.

Modified: trunk/core/src/test/java/org/infinispan/api/mvcc/repeatable_read/WriteSkewTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/api/mvcc/repeatable_read/WriteSkewTest.java	2009-11-08 16:54:40 UTC (rev 1116)
+++ trunk/core/src/test/java/org/infinispan/api/mvcc/repeatable_read/WriteSkewTest.java	2009-11-09 10:48:11 UTC (rev 1117)
@@ -10,19 +10,31 @@
 import org.infinispan.test.fwk.TestCacheManagerFactory;
 import org.infinispan.util.concurrent.IsolationLevel;
 import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import javax.transaction.Status;
 import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 @Test(groups = {"functional", "mvcc"}, testName = "api.mvcc.repeatable_read.WriteSkewTest")
 public class WriteSkewTest extends AbstractInfinispanTest {
+   private static final Log log = LogFactory.getLog(WriteSkewTest.class);
    protected TransactionManager tm;
    protected LockManager lockManager;
    protected InvocationContextContainer icc;
@@ -75,6 +87,41 @@
       doTest(false);
    }
 
+   public void testWriteSkewWithOnlyPut() throws Exception {
+      Configuration writeSkewCheck = new Configuration();
+      writeSkewCheck.setWriteSkewCheck(true);
+      cacheManager.defineConfiguration("writeSkewCheckWithOnlyPut", writeSkewCheck);
+      cache = cacheManager.getCache("writeSkewCheckWithOnlyPut");
+      postStart();
+
+      tm.begin();
+      try {
+         cache.put("k", "init");
+      } catch (Exception e) {
+         tm.setRollbackOnly();
+         throw e;
+      } finally {
+         if (tm.getStatus() == Status.STATUS_ACTIVE) tm.commit();
+         else tm.rollback();
+      }
+
+      int nbWriters = 10;
+      CyclicBarrier barrier = new CyclicBarrier(nbWriters + 1);
+      List<Future<Void>> futures = new ArrayList<Future<Void>>(nbWriters);
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      for (int i = 0; i < nbWriters; i++) {
+         log.debug("Schedule execution");
+         Future<Void> future = executorService.submit(new EntryWriter(barrier));
+         futures.add(future);
+      }
+      barrier.await(); // wait for all threads to be ready
+      barrier.await(); // wait for all threads to finish
+
+      log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported");
+      for (Future<Void> future : futures) future.get();
+   }
+
+
    private void doTest(final boolean allowWriteSkew) throws Exception {
       cache.put("k", "v");
       final Set<Exception> w1exceptions = new HashSet<Exception>();
@@ -162,4 +209,36 @@
          for (Exception e : ce) throw e;
       }
    }
+
+   protected class EntryWriter implements Callable<Void> {
+      private final CyclicBarrier barrier;
+
+      public EntryWriter(CyclicBarrier barrier) {
+         this.barrier = barrier;
+      }
+
+      public Void call() throws Exception {
+         try {
+            log.debug("Wait for all executions paths to be ready to perform calls");
+            barrier.await();
+
+            tm.begin();
+            try {
+               cache.put("k", "_lockthisplease_");
+            } catch (Exception e) {
+               log.error("Unexpected", e);
+               tm.setRollbackOnly();
+               throw e;
+            } finally {
+               if (tm.getStatus() == Status.STATUS_ACTIVE) tm.commit();
+               else tm.rollback();
+            }
+
+            return null;
+         } finally {
+            log.debug("Wait for all execution paths to finish");
+            barrier.await();
+         }
+      }
+   }
 }



More information about the infinispan-commits mailing list