[jbosscache-commits] JBoss Cache SVN: r8326 - in core/trunk/src: test/java/org/jboss/cache/api/mvcc/repeatable_read and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 20 06:46:21 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-20 06:46:20 -0500 (Wed, 20 Jan 2010)
New Revision: 8326

Added:
   core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
Log:
[JBCACHE-1555] (Mvcc node peek could contain stale data) Re-peek after acquiring the lock to retrieve latest data container contents.

Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	2010-01-20 10:11:53 UTC (rev 8325)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	2010-01-20 11:46:20 UTC (rev 8326)
@@ -216,8 +216,10 @@
             boolean needToCopy = false;
             if (lockForWriting && acquireLock(context, fqn)) {
                needToCopy = true;
+               // re-peek in case we waited for a lock and some other thread modified the data while we're waiting
+               nodes = dataContainer.peekInternalNodeAndDirectParent(fqn, includeInvalidNodes);
             }
-            n = nodeFactory.createWrappedNode(in, nodes[1]);
+            n = nodeFactory.createWrappedNode(nodes[0], nodes[1]);
             context.putLookedUpNode(fqn, n);
             if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
          } else if (createIfAbsent) // else, do we need to create one?

Added: core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java	2010-01-20 11:46:20 UTC (rev 8326)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.api.mvcc.repeatable_read;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.factories.UnitTestConfigurationFactory;
+import org.jboss.cache.lock.IsolationLevel;
+import org.testng.annotations.Test;
+
+/**
+ * ConcurrentRepeatableReadTest.
+ * 
+ * @author Galder Zamarreño
+ */
+ at Test(groups = { "functional" }, testName = "api.mvcc.repeatable_read.ConcurrentRepeatableReadTest")
+public class ConcurrentRepeatableReadTest extends AbstractSingleCacheTest 
+{
+   static final Log log = LogFactory.getLog(ConcurrentRepeatableReadTest.class);
+   final ExecutorService executorService = Executors.newCachedThreadPool();
+   
+   @Override
+   protected CacheSPI createCache() throws Exception 
+   {
+      UnitTestCacheFactory factory = new UnitTestCacheFactory();
+      Configuration cfg = UnitTestConfigurationFactory.createConfiguration(CacheMode.LOCAL);
+      cfg.setIsolationLevel(IsolationLevel.REPEATABLE_READ);      
+      CacheSPI cache = (CacheSPI) factory.createCache(cfg, false, getClass());
+      return cache;
+   }
+
+   public void testConcurrentUpdatesNoWriteSkew(Method m) throws Exception {
+      final int nbWriters = 10;
+      log.debug(m.getName());
+      cache.start();
+      init();
+      CyclicBarrier barrier = new CyclicBarrier(nbWriters + 1);
+      List<Future<Void>> futures = new ArrayList<Future<Void>>(nbWriters);
+      for (int i = 0; i < nbWriters; i++) {
+         log.debug("Schedule execution");
+         Future<Void> future = executorService.submit(new IncrementNoWriteSkew(barrier));
+         futures.add(future);
+      }
+      barrier.await(); // wait for all threads to be ready
+      barrier.await(); // wait for all threads to finish
+
+      log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported");
+      for (Future<Void> future : futures) future.get();
+
+      assertEquals(nbWriters, get());
+   }
+
+   public void testConcurrentUpdatesWriteSkew(Method m) throws Exception {
+      final int nbWriters = 10;
+      CacheSPI cache = null;
+      try {
+         log.debug(m.getName());
+         UnitTestCacheFactory factory = new UnitTestCacheFactory();
+         Configuration cfg = UnitTestConfigurationFactory.createConfiguration(CacheMode.LOCAL);
+         cfg.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
+         cfg.setWriteSkewCheck(true);
+         cache = (CacheSPI) factory.createCache(cfg, false, getClass());
+         cache.start();         
+         assert cache.getConfiguration().isWriteSkewCheck();
+         init();
+         CyclicBarrier barrier = new CyclicBarrier(nbWriters + 1);
+         List<Future<Void>> futures = new ArrayList<Future<Void>>(nbWriters);
+         for (int i = 0; i < nbWriters; i++) {
+            log.debug("Schedule execution");
+            Future<Void> future = executorService.submit(new IncrementWriteSkew(barrier));
+            futures.add(future);
+         }
+         barrier.await(); // wait for all threads to be ready
+         barrier.await(); // wait for all threads to finish
+
+         log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported");
+         for (Future<Void> future : futures) future.get();         
+      } finally {
+         if (cache != null) cache.stop();
+      }
+   }
+
+   private void init() throws Exception {
+      TransactionManager tx = getTm();
+      tx.begin();
+      try {
+         cache.put("/foo/mynode", "scalar", 0);
+      } catch (Exception e) {
+         tx.setRollbackOnly();
+         throw e;
+      } finally {
+         if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+         else tx.rollback();
+      }
+   }
+
+   private void incrementNoWriteSkew() throws Exception {
+      TransactionManager tx = getTm();
+      tx.begin();
+      try {
+         // cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_");
+         // TODO: when testing writeSkew=true, uncomment line before and start commenting here to make it easier to debug...
+         cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
+         int tmp = (Integer) cache.get("/foo/mynode", "scalar");
+         tmp++;
+         cache.put("/foo/mynode", "scalar", tmp);
+         // TODO: ...and finish commenting here
+      } catch (Exception e) {
+         log.error("Unexpected", e);
+         tx.setRollbackOnly();
+         throw e;
+      } finally {
+         if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+         else tx.rollback();
+      }
+   }
+
+   private void incrementWriteSkew() throws Exception {
+      TransactionManager tx = getTm();
+      tx.begin();
+      try {
+         cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_");
+//         // TODO: when testing writeSkew=true, uncomment line before and start commenting here to make it easier to debug...
+//         cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
+//         int tmp = (Integer) cache.get("/foo/mynode", "scalar");
+//         tmp++;
+//         cache.put("/foo/mynode", "scalar", tmp);
+//         // TODO: ...and finish commenting here
+      } catch (Exception e) {
+         log.error("Unexpected", e);
+         tx.setRollbackOnly();
+         throw e;
+      } finally {
+         if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+         else tx.rollback();
+      }
+   }
+
+
+   public int get() throws Exception {
+      TransactionManager tx = getTm();
+      tx.begin();
+      try {
+         int ret = (Integer) cache.get("/foo/mynode", "scalar");
+         return ret;
+      } catch (Exception e) {
+         tx.setRollbackOnly();
+         throw e;
+      } finally {
+         if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+         else tx.rollback();
+      }
+   }
+
+   private TransactionManager getTm() {
+      return cache.getConfiguration().getRuntimeConfig().getTransactionManager();
+   }
+
+   class IncrementNoWriteSkew implements Callable<Void> {
+      private final CyclicBarrier barrier;
+
+      public IncrementNoWriteSkew(CyclicBarrier barrier) {
+         this.barrier = barrier;
+      }
+
+      public Void call() throws Exception {
+         try {
+            log.debug("Wait for all executions paths to be ready to perform calls");
+            barrier.await();
+            incrementNoWriteSkew();
+            return null;
+         } finally {
+            log.debug("Wait for all execution paths to finish");
+            barrier.await();
+         }
+      }
+   }
+
+   class IncrementWriteSkew implements Callable<Void> {
+      private final CyclicBarrier barrier;
+
+      public IncrementWriteSkew(CyclicBarrier barrier) {
+         this.barrier = barrier;
+      }
+
+      public Void call() throws Exception {
+         try {
+            log.debug("Wait for all executions paths to be ready to perform calls");
+            barrier.await();
+            incrementWriteSkew();
+            return null;
+         } finally {
+            log.debug("Wait for all execution paths to finish");
+            barrier.await();
+         }
+      }
+   }
+
+   
+}



More information about the jbosscache-commits mailing list