Author: galder.zamarreno(a)jboss.com
Date: 2010-03-29 13:38:37 -0400 (Mon, 29 Mar 2010)
New Revision: 8360
Modified:
core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
Log:
[JBCACHE-1572] (NPE on cache.put when the corresponding fqn is removed in parallel)
Fixed.
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2010-03-26 00:24:30
UTC (rev 8359)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2010-03-29 17:38:37
UTC (rev 8360)
@@ -218,31 +218,18 @@
needToCopy = true;
// re-peek in case we waited for a lock and some other thread modified the
data while we're waiting
nodes = dataContainer.peekInternalNodeAndDirectParent(fqn,
includeInvalidNodes);
+ in = nodes[0];
}
- n = nodeFactory.createWrappedNode(nodes[0], nodes[1]);
- context.putLookedUpNode(fqn, n);
- if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
+ if (in != null) {
+ n = nodeFactory.createWrappedNode(in, nodes[1]);
+ context.putLookedUpNode(fqn, n);
+ if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
+ } else if (createIfAbsent) {
+ n = createAbsentNode(parentFqn, fqn, context);
+ }
} else if (createIfAbsent) // else, do we need to create one?
{
- parentFqn = fqn.getParent();
- NodeSPI parent = wrapNodeForWriting(context, parentFqn, false,
createIfAbsent, false, false, false);
- // do we need to lock the parent to create children?
- boolean parentLockNeeded = isParentLockNeeded(parent.getDelegationTarget());
- // get a lock on the parent.
- if (parentLockNeeded && acquireLock(context, parentFqn)) {
- ReadCommittedNode parentRCN = (ReadCommittedNode)
context.lookUpNode(parentFqn);
- parentRCN.markForUpdate(dataContainer, writeSkewCheck);
- }
-
- // now to lock and create the node. Lock first to prevent concurrent
creation!
- acquireLock(context, fqn);
- in = nodeFactory.createChildNode(fqn, null, context, false);
-
- n = nodeFactory.createWrappedNode(in, parent.getDelegationTarget());
- n.setCreated(true);
- n.setDataLoaded(true); // created here so we are loading it here
- context.putLookedUpNode(fqn, n);
- n.markForUpdate(dataContainer, writeSkewCheck);
+ n = createAbsentNode(parentFqn, fqn, context);
}
}
@@ -262,6 +249,29 @@
return n;
}
+ private ReadCommittedNode createAbsentNode(Fqn parentFqn, Fqn fqn, InvocationContext
context) throws InterruptedException {
+ parentFqn = fqn.getParent();
+ NodeSPI parent = wrapNodeForWriting(context, parentFqn, false, true, false, false,
false);
+ // do we need to lock the parent to create children?
+ boolean parentLockNeeded = isParentLockNeeded(parent.getDelegationTarget());
+ // get a lock on the parent.
+ if (parentLockNeeded && acquireLock(context, parentFqn)) {
+ ReadCommittedNode parentRCN = (ReadCommittedNode)
context.lookUpNode(parentFqn);
+ parentRCN.markForUpdate(dataContainer, writeSkewCheck);
+ }
+
+ // now to lock and create the node. Lock first to prevent concurrent creation!
+ acquireLock(context, fqn);
+ InternalNode in = nodeFactory.createChildNode(fqn, null, context, false);
+
+ ReadCommittedNode n = nodeFactory.createWrappedNode(in,
parent.getDelegationTarget());
+ n.setCreated(true);
+ n.setDataLoaded(true); // created here so we are loading it here
+ context.putLookedUpNode(fqn, n);
+ n.markForUpdate(dataContainer, writeSkewCheck);
+ return n;
+ }
+
/**
* The same as {@link #wrapNodeForWriting(org.jboss.cache.InvocationContext,
org.jboss.cache.Fqn, boolean, boolean,
* boolean, boolean, boolean)} except that it takes in an {@link
org.jboss.cache.InternalNode} instead of a {@link
Modified:
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java 2010-03-26
00:24:30 UTC (rev 8359)
+++
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java 2010-03-29
17:38:37 UTC (rev 8360)
@@ -23,10 +23,13 @@
import static org.testng.AssertJUnit.assertEquals;
+import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,7 +41,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.Fqn;
import org.jboss.cache.UnitTestCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
@@ -118,6 +125,93 @@
}
}
+ public void testConcurrentCreateRemove() throws Exception {
+ final int totalElement = 100;
+ final int totalTimes = 20;
+ int writer = 10;
+ int remover = 5;
+ final CountDownLatch startSignalWriter = new CountDownLatch(1);
+ final CountDownLatch startSignalOthers = new CountDownLatch(1);
+ final CountDownLatch doneSignal = new CountDownLatch(writer + remover);
+ final List<Exception> errors = Collections.synchronizedList(new
ArrayList<Exception>());
+ for (int i = 0; i < writer; i++)
+ {
+ final int index = i;
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignalWriter.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.put(Fqn.fromElements("key" + i), "key"
+ i, "value" + i);
+ if (index == 0 && j == 0)
+ {
+ // The cache is full, we can launch the others
+ startSignalOthers.countDown();
+ }
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ startSignalWriter.countDown();
+ for (int i = 0; i < remover; i++)
+ {
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignalOthers.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.removeNode(Fqn.fromElements("key" + i));
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ doneSignal.await();
+ if (!errors.isEmpty())
+ {
+ for (Exception e : errors)
+ {
+ e.printStackTrace();
+ }
+ throw errors.get(0);
+ }
+ }
+
private void init() throws Exception {
TransactionManager tx = getTm();
tx.begin();