[jbosscache-commits] JBoss Cache SVN: r8360 - in core/trunk/src: test/java/org/jboss/cache/api/mvcc/repeatable_read and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Mar 29 13:38:38 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-29 13:38:37 -0400 (Mon, 29 Mar 2010)
New Revision: 8360

Modified:
   core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
   core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
Log:
[JBCACHE-1572] (NPE on cache.put when the corresponding fqn is removed in parallel) Fixed.


Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	2010-03-26 00:24:30 UTC (rev 8359)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	2010-03-29 17:38:37 UTC (rev 8360)
@@ -218,31 +218,18 @@
                needToCopy = true;
                // re-peek in case we waited for a lock and some other thread modified the data while we're waiting
                nodes = dataContainer.peekInternalNodeAndDirectParent(fqn, includeInvalidNodes);
+               in = nodes[0];
             }
-            n = nodeFactory.createWrappedNode(nodes[0], nodes[1]);
-            context.putLookedUpNode(fqn, n);
-            if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
+            if (in != null) {
+               n = nodeFactory.createWrappedNode(in, nodes[1]);
+               context.putLookedUpNode(fqn, n);
+               if (needToCopy) n.markForUpdate(dataContainer, writeSkewCheck);
+            } else if (createIfAbsent) {
+               n = createAbsentNode(parentFqn, fqn, context);
+            }
          } else if (createIfAbsent) // else, do we need to create one?
          {
-            parentFqn = fqn.getParent();
-            NodeSPI parent = wrapNodeForWriting(context, parentFqn, false, createIfAbsent, false, false, false);
-            // do we need to lock the parent to create children?
-            boolean parentLockNeeded = isParentLockNeeded(parent.getDelegationTarget());
-            // get a lock on the parent.
-            if (parentLockNeeded && acquireLock(context, parentFqn)) {
-               ReadCommittedNode parentRCN = (ReadCommittedNode) context.lookUpNode(parentFqn);
-               parentRCN.markForUpdate(dataContainer, writeSkewCheck);
-            }
-
-            // now to lock and create the node.  Lock first to prevent concurrent creation!
-            acquireLock(context, fqn);
-            in = nodeFactory.createChildNode(fqn, null, context, false);
-
-            n = nodeFactory.createWrappedNode(in, parent.getDelegationTarget());
-            n.setCreated(true);
-            n.setDataLoaded(true); // created here so we are loading it here
-            context.putLookedUpNode(fqn, n);
-            n.markForUpdate(dataContainer, writeSkewCheck);
+            n = createAbsentNode(parentFqn, fqn, context);
          }
       }
 
@@ -262,6 +249,29 @@
       return n;
    }
 
+   private ReadCommittedNode createAbsentNode(Fqn parentFqn, Fqn fqn, InvocationContext context) throws InterruptedException {
+      parentFqn = fqn.getParent();
+      NodeSPI parent = wrapNodeForWriting(context, parentFqn, false, true, false, false, false);
+      // do we need to lock the parent to create children?
+      boolean parentLockNeeded = isParentLockNeeded(parent.getDelegationTarget());
+      // get a lock on the parent.
+      if (parentLockNeeded && acquireLock(context, parentFqn)) {
+         ReadCommittedNode parentRCN = (ReadCommittedNode) context.lookUpNode(parentFqn);
+         parentRCN.markForUpdate(dataContainer, writeSkewCheck);
+      }
+
+      // now to lock and create the node.  Lock first to prevent concurrent creation!
+      acquireLock(context, fqn);
+      InternalNode in = nodeFactory.createChildNode(fqn, null, context, false);
+
+      ReadCommittedNode n = nodeFactory.createWrappedNode(in, parent.getDelegationTarget());
+      n.setCreated(true);
+      n.setDataLoaded(true); // created here so we are loading it here
+      context.putLookedUpNode(fqn, n);
+      n.markForUpdate(dataContainer, writeSkewCheck);
+      return n;
+   }
+
    /**
     * The same as {@link #wrapNodeForWriting(org.jboss.cache.InvocationContext, org.jboss.cache.Fqn, boolean, boolean,
     * boolean, boolean, boolean)} except that it takes in an {@link org.jboss.cache.InternalNode} instead of a {@link

Modified: core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java	2010-03-26 00:24:30 UTC (rev 8359)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/ConcurrentRepeatableReadTest.java	2010-03-29 17:38:37 UTC (rev 8360)
@@ -23,10 +23,13 @@
 
 import static org.testng.AssertJUnit.assertEquals;
 
+import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,7 +41,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
 import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.Fqn;
 import org.jboss.cache.UnitTestCacheFactory;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.CacheMode;
@@ -118,6 +125,93 @@
       }
    }
 
+   public void testConcurrentCreateRemove() throws Exception {
+      final int totalElement = 100;
+      final int totalTimes = 20;
+      int writer = 10;
+      int remover = 5;
+      final CountDownLatch startSignalWriter = new CountDownLatch(1);
+      final CountDownLatch startSignalOthers = new CountDownLatch(1);
+      final CountDownLatch doneSignal = new CountDownLatch(writer + remover);
+      final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>());
+      for (int i = 0; i < writer; i++)
+      {
+         final int index = i;
+         Thread thread = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  startSignalWriter.await();
+                  for (int j = 0; j < totalTimes; j++)
+                  {
+                     for (int i = 0; i < totalElement; i++)
+                     {
+                        cache.put(Fqn.fromElements("key" + i), "key" + i, "value" + i);
+                        if (index == 0 && j == 0)
+                        {
+                           // The cache is full, we can launch the others
+                           startSignalOthers.countDown();
+                        }
+                     }
+                     sleep(50);
+                  }
+               }
+               catch (Exception e)
+               {
+                  errors.add(e);
+               }
+               finally
+               {
+                  doneSignal.countDown();
+               }
+            }
+         };
+         thread.start();
+      }
+      startSignalWriter.countDown();
+      for (int i = 0; i < remover; i++)
+      {
+         Thread thread = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  startSignalOthers.await();
+                  for (int j = 0; j < totalTimes; j++)
+                  {
+                     for (int i = 0; i < totalElement; i++)
+                     {
+                        cache.removeNode(Fqn.fromElements("key" + i));
+                     }
+                     sleep(50);
+                  }
+               }
+               catch (Exception e)
+               {
+                  errors.add(e);
+               }
+               finally
+               {
+                  doneSignal.countDown();
+               }
+            }
+         };
+         thread.start();
+      }
+      doneSignal.await();
+      if (!errors.isEmpty())
+      {
+         for (Exception e : errors)
+         {
+            e.printStackTrace();
+         }
+         throw errors.get(0);
+      }
+   }
+
    private void init() throws Exception {
       TransactionManager tx = getTm();
       tx.begin();



More information about the jbosscache-commits mailing list