Author: galder.zamarreno(a)jboss.com
Date: 2010-01-20 06:46:20 -0500 (Wed, 20 Jan 2010)
New Revision: 8326
Added:
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
Log:
[JBCACHE-1555] (Mvcc node peek could contain stale data) Re-peek after acquiring the lock
to retrieve latest data container contents.
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2010-01-20 10:11:53
UTC (rev 8325)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2010-01-20 11:46:20
UTC (rev 8326)
@@ -216,8 +216,10 @@
boolean needToCopy = false;
if (lockForWriting && acquireLock(context, fqn)) {
needToCopy = true;
+ // re-peek in case we waited for a lock and some other thread modified the
data while we're waiting
+ nodes = dataContainer.peekInternalNodeAndDirectParent(fqn,
includeInvalidNodes);
}
- n = nodeFactory.createWrappedNode(in, nodes[1]);
+ n = nodeFactory.createWrappedNode(nodes[0], nodes[1]);
context.putLookedUpNode(fqn, n);
if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
} else if (createIfAbsent) // else, do we need to create one?
Added:
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java 2010-01-20
11:46:20 UTC (rev 8326)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.cache.api.mvcc.repeatable_read;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.factories.UnitTestConfigurationFactory;
+import org.jboss.cache.lock.IsolationLevel;
+import org.testng.annotations.Test;
+
+/**
+ * ConcurrentRepeatableReadTest.
+ *
+ * @author Galder Zamarreño
+ */
+@Test(groups = { "functional" }, testName =
"api.mvcc.repeatable_read.ConcurrentRepeatableReadTest")
+public class ConcurrentRepeatableReadTest extends AbstractSingleCacheTest
+{
+ static final Log log = LogFactory.getLog(ConcurrentRepeatableReadTest.class);
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+
+ @Override
+ protected CacheSPI createCache() throws Exception
+ {
+ UnitTestCacheFactory factory = new UnitTestCacheFactory();
+ Configuration cfg =
UnitTestConfigurationFactory.createConfiguration(CacheMode.LOCAL);
+ cfg.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
+ CacheSPI cache = (CacheSPI) factory.createCache(cfg, false, getClass());
+ return cache;
+ }
+
+ public void testConcurrentUpdatesNoWriteSkew(Method m) throws Exception {
+ final int nbWriters = 10;
+ log.debug(m.getName());
+ cache.start();
+ init();
+ CyclicBarrier barrier = new CyclicBarrier(nbWriters + 1);
+ List<Future<Void>> futures = new
ArrayList<Future<Void>>(nbWriters);
+ for (int i = 0; i < nbWriters; i++) {
+ log.debug("Schedule execution");
+ Future<Void> future = executorService.submit(new
IncrementNoWriteSkew(barrier));
+ futures.add(future);
+ }
+ barrier.await(); // wait for all threads to be ready
+ barrier.await(); // wait for all threads to finish
+
+ log.debug("All threads finished, let's shutdown the executor and check
whether any exceptions were reported");
+ for (Future<Void> future : futures) future.get();
+
+ assertEquals(nbWriters, get());
+ }
+
+ public void testConcurrentUpdatesWriteSkew(Method m) throws Exception {
+ final int nbWriters = 10;
+ CacheSPI cache = null;
+ try {
+ log.debug(m.getName());
+ UnitTestCacheFactory factory = new UnitTestCacheFactory();
+ Configuration cfg =
UnitTestConfigurationFactory.createConfiguration(CacheMode.LOCAL);
+ cfg.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
+ cfg.setWriteSkewCheck(true);
+ cache = (CacheSPI) factory.createCache(cfg, false, getClass());
+ cache.start();
+ assert cache.getConfiguration().isWriteSkewCheck();
+ init();
+ CyclicBarrier barrier = new CyclicBarrier(nbWriters + 1);
+ List<Future<Void>> futures = new
ArrayList<Future<Void>>(nbWriters);
+ for (int i = 0; i < nbWriters; i++) {
+ log.debug("Schedule execution");
+ Future<Void> future = executorService.submit(new
IncrementWriteSkew(barrier));
+ futures.add(future);
+ }
+ barrier.await(); // wait for all threads to be ready
+ barrier.await(); // wait for all threads to finish
+
+ log.debug("All threads finished, let's shutdown the executor and check
whether any exceptions were reported");
+ for (Future<Void> future : futures) future.get();
+ } finally {
+ if (cache != null) cache.stop();
+ }
+ }
+
+ private void init() throws Exception {
+ TransactionManager tx = getTm();
+ tx.begin();
+ try {
+ cache.put("/foo/mynode", "scalar", 0);
+ } catch (Exception e) {
+ tx.setRollbackOnly();
+ throw e;
+ } finally {
+ if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+ else tx.rollback();
+ }
+ }
+
+ private void incrementNoWriteSkew() throws Exception {
+ TransactionManager tx = getTm();
+ tx.begin();
+ try {
+ // cache.put("/foo/mynode", "_lockthisplease_",
"_lockthisplease_");
+ // TODO: when testing writeSkew=true, uncomment line before and start commenting
here to make it easier to debug...
+ cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
+ int tmp = (Integer) cache.get("/foo/mynode", "scalar");
+ tmp++;
+ cache.put("/foo/mynode", "scalar", tmp);
+ // TODO: ...and finish commenting here
+ } catch (Exception e) {
+ log.error("Unexpected", e);
+ tx.setRollbackOnly();
+ throw e;
+ } finally {
+ if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+ else tx.rollback();
+ }
+ }
+
+ private void incrementWriteSkew() throws Exception {
+ TransactionManager tx = getTm();
+ tx.begin();
+ try {
+ cache.put("/foo/mynode", "_lockthisplease_",
"_lockthisplease_");
+// // TODO: when testing writeSkew=true, uncomment line before and start
commenting here to make it easier to debug...
+// cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
+// int tmp = (Integer) cache.get("/foo/mynode", "scalar");
+// tmp++;
+// cache.put("/foo/mynode", "scalar", tmp);
+// // TODO: ...and finish commenting here
+ } catch (Exception e) {
+ log.error("Unexpected", e);
+ tx.setRollbackOnly();
+ throw e;
+ } finally {
+ if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+ else tx.rollback();
+ }
+ }
+
+
+ public int get() throws Exception {
+ TransactionManager tx = getTm();
+ tx.begin();
+ try {
+ int ret = (Integer) cache.get("/foo/mynode", "scalar");
+ return ret;
+ } catch (Exception e) {
+ tx.setRollbackOnly();
+ throw e;
+ } finally {
+ if (tx.getStatus() == Status.STATUS_ACTIVE) tx.commit();
+ else tx.rollback();
+ }
+ }
+
+ private TransactionManager getTm() {
+ return cache.getConfiguration().getRuntimeConfig().getTransactionManager();
+ }
+
+ class IncrementNoWriteSkew implements Callable<Void> {
+ private final CyclicBarrier barrier;
+
+ public IncrementNoWriteSkew(CyclicBarrier barrier) {
+ this.barrier = barrier;
+ }
+
+ public Void call() throws Exception {
+ try {
+ log.debug("Wait for all executions paths to be ready to perform
calls");
+ barrier.await();
+ incrementNoWriteSkew();
+ return null;
+ } finally {
+ log.debug("Wait for all execution paths to finish");
+ barrier.await();
+ }
+ }
+ }
+
+ class IncrementWriteSkew implements Callable<Void> {
+ private final CyclicBarrier barrier;
+
+ public IncrementWriteSkew(CyclicBarrier barrier) {
+ this.barrier = barrier;
+ }
+
+ public Void call() throws Exception {
+ try {
+ log.debug("Wait for all executions paths to be ready to perform
calls");
+ barrier.await();
+ incrementWriteSkew();
+ return null;
+ } finally {
+ log.debug("Wait for all execution paths to finish");
+ barrier.await();
+ }
+ }
+ }
+
+
+}