[jbosscache-commits] JBoss Cache SVN: r4729 - in core/trunk/src: test/java/org/jboss/cache/lock/pessimistic and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Nov 6 10:13:18 EST 2007


Author: manik.surtani at jboss.com
Date: 2007-11-06 10:13:18 -0500 (Tue, 06 Nov 2007)
New Revision: 4729

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
   core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java
Log:
Much better behaviour with JBCACHE-1165.  No more endless loops and lock acquisition timeouts when you have concurrent puts and removes.

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2007-11-06 03:36:23 UTC (rev 4728)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2007-11-06 15:13:18 UTC (rev 4729)
@@ -120,6 +120,7 @@
             fqn = (Fqn) args[1];
             lock_type = NodeLock.LockType.WRITE;
             recursive = true;// remove node and *all* child nodes
+            createIfNotExists = true;
             break;
          case MethodDeclarations.putDataMethodLocal_id:
          case MethodDeclarations.putDataEraseMethodLocal_id:
@@ -181,9 +182,15 @@
          if (!locksAlreadyObtained)
          {
             long timeout = zeroLockTimeout ? 0 : getLockAcquisitionTimeout(ctx);
+            // make sure we can bail out of this loop
+            long cutoffTime = System.currentTimeMillis() + timeout;
+            boolean firstTry = true;
             do
             {
+               // this is an additional check to make sure we don't try for too long.
+               if (!firstTry && System.currentTimeMillis() > cutoffTime) throw new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after " + timeout + " millis");
                lock(ctx, fqn, lock_type, recursive, createIfNotExists, timeout, isDeleteOperation, isEvictOperation, isRemoveDataOperation);
+               firstTry = false;
             }
             while (createIfNotExists && cache.peek(fqn, false) == null);// keep trying until we have the lock (fixes concurrent remove())
          }
@@ -204,14 +211,6 @@
       // to add the removedNodes map to CacheImpl
       if (isDeleteOperation && ctx.getGlobalTransaction() == null)
       {
-         //cache.getRemovedNodesMap().remove(fqn);
-         //cache.peek(fqn);
-         // do a REAL remove here.
-         NodeSPI n = cache.peek(fqn, true);
-         if (n != null)
-         {
-            lockManager.getLock(n).releaseAll(Thread.currentThread());
-         }
          //CacheIml._move internally removes the source node. 'if clause' solve the scenario when a node is
          // moved in the same place, i.e. it will be removed from the source - which is also destination - so it ends
          // up being removed for good
@@ -219,9 +218,14 @@
          {
             ((CacheImpl) cache).realRemove(fqn, true);
          }
+         // do a REAL remove here.
+         NodeSPI n = cache.peek(fqn, true);
+         if (n != null)
+         {
+            lockManager.getLock(n).releaseAll(Thread.currentThread());
+         }
       }
-      else
-      if (m.getMethodId() == MethodDeclarations.commitMethod_id || isOnePhaseCommitPrepareMehod(m) || m.getMethodId() == MethodDeclarations.rollbackMethod_id)
+      else if (m.getMethodId() == MethodDeclarations.commitMethod_id || isOnePhaseCommitPrepareMehod(m) || m.getMethodId() == MethodDeclarations.rollbackMethod_id)
       {
          cleanup(ctx.getGlobalTransaction());
       }
@@ -311,10 +315,12 @@
          }
 
          boolean created = false;
+         if (log.isTraceEnabled()) log.trace("Directly got child node " + child_name + ".  Hash code is " + (child_node == null ? "null" : child_node.hashCode()));
          if (child_node == null && createIfNotExists)
          {
             child_node = n.addChildDirect(new Fqn(child_name));
             created = true;
+            if (log.isTraceEnabled()) log.trace("Created child node " + child_name + ".  Hash code is " + (child_node == null ? "null" : child_node.hashCode()));
          }
 
          if (child_node == null)
@@ -351,9 +357,19 @@
             reverseRemove(child_node);
          }
 
-         // actually acquire the lock we need.
+         // actually acquire the lock we need.  This method blocks.
          acquireNodeLock(child_node, owner, gtx, lockTypeRequired, timeout);
 
+         // make sure the lock we acquired isn't on a deleted node/is an orphan!!
+         if (child_node != cache.peek(child_node.getFqn(), true))
+         {
+            // we have an orphan!! Lose the unnecessary lock and re-acquire the lock (and potentially recreate the node).
+            child_node.getLock().release(owner);
+
+            // do the loop again, but don't assign child_node to n so that child_node is processed again.
+            continue;
+         }
+
          if (recursive && isTargetNode(i, treeNodeSize))
          {
             Set<NodeLock> acquired_locks = lockManager.acquireAll(child_node, owner, lock_type, timeout);

Modified: core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java	2007-11-06 03:36:23 UTC (rev 4728)
+++ core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java	2007-11-06 15:13:18 UTC (rev 4729)
@@ -1,46 +1,58 @@
 package org.jboss.cache.lock.pessimistic;
 
-import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.AfterMethod;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.Cache;
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.lock.IsolationLevel;
-import org.jboss.cache.config.Configuration;
+import org.jboss.cache.misc.TestingUtil;
 import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
 
+import javax.transaction.TransactionManager;
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.transaction.TransactionManager;
-
- at Test(groups = {"functional"}, enabled = false) // Known issue - See JBCACHE-1164 and JBCACHE-1165 
+ at Test(groups = {"functional"}, enabled = true) // Known issue - See JBCACHE-1164 and JBCACHE-1165
 public class ConcurrentPutRemoveTest
 {
 	private TransactionManager tm;
 
 	private Cache cache;
 
+   private final Log log = LogFactory.getLog(ConcurrentPutRemoveTest.class);
+   private List<SeparateThread> threads;
+
+
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
 		cache = DefaultCacheFactory.getInstance().createCache(false);
       cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
 		cache.getConfiguration().setIsolationLevel(IsolationLevel.READ_COMMITTED);
 		cache.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-		cache.getConfiguration().setLockAcquisitionTimeout(10000);
+		cache.getConfiguration().setLockAcquisitionTimeout(1000);
 		cache.start();
 		tm = cache.getConfiguration().getRuntimeConfig().getTransactionManager();
-	}
+      threads = new ArrayList<SeparateThread>();
+   }
 
    @AfterMethod(alwaysRun = true)
    public void tearDown() throws Exception
    {
-		cache.destroy();
-	}
+		TestingUtil.killCaches(cache);
+      for (SeparateThread st : threads)
+      {
+         st.interrupt();
+         st.join();
+      }
+   }
 
+   @Test (invocationCount = 20)
    public void testLock() throws Exception {
-		List<SeparateThread> threads = new ArrayList<SeparateThread>();
 		for (int x = 0; x < 2; x++) {
 			SeparateThread t = new SeparateThread(x);
 			threads.add(t);
@@ -72,16 +84,16 @@
 			Thread.currentThread().setName("Thread:" + num);
 			try {
 				for (int x = 0; x < 1000; x++) {
-					tm.begin();
-					System.out.println("R" + Thread.currentThread().getName());
+               tm.begin();
+					log.warn("Before Remove ("+x+")");
 					//inside transaction
 					cache.removeNode(Fqn.fromString("/a"));
-					System.out.println("AR" + Thread.currentThread().getName());
+					log.warn("After Remove ("+x+")");
 					tm.commit();
 					//outside transaction
-					System.out.println("P" + Thread.currentThread().getName());
+					log.warn("Before Put ("+x+")");
 					cache.put(Fqn.fromString("/a/b/c/d"), "text"+x,"b");
-					System.out.println("AP" + Thread.currentThread().getName());
+					log.warn("After Put ("+x+")");
 				}
 			} catch (Exception e) {
 				this.e = e;




More information about the jbosscache-commits mailing list