Author: manik.surtani(a)jboss.com
Date: 2007-11-06 10:13:18 -0500 (Tue, 06 Nov 2007)
New Revision: 4729
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java
Log:
Much better behaviour with JBCACHE-1165. No more endless loops and lock acquisition
timeouts when you have concurrent puts and removes.
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-06
03:36:23 UTC (rev 4728)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-06
15:13:18 UTC (rev 4729)
@@ -120,6 +120,7 @@
fqn = (Fqn) args[1];
lock_type = NodeLock.LockType.WRITE;
recursive = true;// remove node and *all* child nodes
+ createIfNotExists = true;
break;
case MethodDeclarations.putDataMethodLocal_id:
case MethodDeclarations.putDataEraseMethodLocal_id:
@@ -181,9 +182,15 @@
if (!locksAlreadyObtained)
{
long timeout = zeroLockTimeout ? 0 : getLockAcquisitionTimeout(ctx);
+ // make sure we can bail out of this loop
+ long cutoffTime = System.currentTimeMillis() + timeout;
+ boolean firstTry = true;
do
{
+ // this is an additional check to make sure we don't try for too
long.
+ if (!firstTry && System.currentTimeMillis() > cutoffTime) throw
new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after
" + timeout + " millis");
lock(ctx, fqn, lock_type, recursive, createIfNotExists, timeout,
isDeleteOperation, isEvictOperation, isRemoveDataOperation);
+ firstTry = false;
}
while (createIfNotExists && cache.peek(fqn, false) == null);// keep
trying until we have the lock (fixes concurrent remove())
}
@@ -204,14 +211,6 @@
// to add the removedNodes map to CacheImpl
if (isDeleteOperation && ctx.getGlobalTransaction() == null)
{
- //cache.getRemovedNodesMap().remove(fqn);
- //cache.peek(fqn);
- // do a REAL remove here.
- NodeSPI n = cache.peek(fqn, true);
- if (n != null)
- {
- lockManager.getLock(n).releaseAll(Thread.currentThread());
- }
//CacheIml._move internally removes the source node. 'if clause' solve
the scenario when a node is
// moved in the same place, i.e. it will be removed from the source - which is
also destination - so it ends
// up being removed for good
@@ -219,9 +218,14 @@
{
((CacheImpl) cache).realRemove(fqn, true);
}
+ // do a REAL remove here.
+ NodeSPI n = cache.peek(fqn, true);
+ if (n != null)
+ {
+ lockManager.getLock(n).releaseAll(Thread.currentThread());
+ }
}
- else
- if (m.getMethodId() == MethodDeclarations.commitMethod_id ||
isOnePhaseCommitPrepareMehod(m) || m.getMethodId() ==
MethodDeclarations.rollbackMethod_id)
+ else if (m.getMethodId() == MethodDeclarations.commitMethod_id ||
isOnePhaseCommitPrepareMehod(m) || m.getMethodId() ==
MethodDeclarations.rollbackMethod_id)
{
cleanup(ctx.getGlobalTransaction());
}
@@ -311,10 +315,12 @@
}
boolean created = false;
+ if (log.isTraceEnabled()) log.trace("Directly got child node " +
child_name + ". Hash code is " + (child_node == null ? "null" :
child_node.hashCode()));
if (child_node == null && createIfNotExists)
{
child_node = n.addChildDirect(new Fqn(child_name));
created = true;
+ if (log.isTraceEnabled()) log.trace("Created child node " +
child_name + ". Hash code is " + (child_node == null ? "null" :
child_node.hashCode()));
}
if (child_node == null)
@@ -351,9 +357,19 @@
reverseRemove(child_node);
}
- // actually acquire the lock we need.
+ // actually acquire the lock we need. This method blocks.
acquireNodeLock(child_node, owner, gtx, lockTypeRequired, timeout);
+ // make sure the lock we acquired isn't on a deleted node/is an orphan!!
+ if (child_node != cache.peek(child_node.getFqn(), true))
+ {
+ // we have an orphan!! Lose the unnecessary lock and re-acquire the lock (and
potentially recreate the node).
+ child_node.getLock().release(owner);
+
+ // do the loop again, but don't assign child_node to n so that child_node
is processed again.
+ continue;
+ }
+
if (recursive && isTargetNode(i, treeNodeSize))
{
Set<NodeLock> acquired_locks = lockManager.acquireAll(child_node,
owner, lock_type, timeout);
Modified:
core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java 2007-11-06
03:36:23 UTC (rev 4728)
+++
core/trunk/src/test/java/org/jboss/cache/lock/pessimistic/ConcurrentPutRemoveTest.java 2007-11-06
15:13:18 UTC (rev 4729)
@@ -1,46 +1,58 @@
package org.jboss.cache.lock.pessimistic;
-import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.AfterMethod;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Cache;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Configuration;
import org.jboss.cache.lock.IsolationLevel;
-import org.jboss.cache.config.Configuration;
+import org.jboss.cache.misc.TestingUtil;
import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.List;
-import javax.transaction.TransactionManager;
-
-@Test(groups = {"functional"}, enabled = false) // Known issue - See
JBCACHE-1164 and JBCACHE-1165
+@Test(groups = {"functional"}, enabled = true) // Known issue - See
JBCACHE-1164 and JBCACHE-1165
public class ConcurrentPutRemoveTest
{
private TransactionManager tm;
private Cache cache;
+ private final Log log = LogFactory.getLog(ConcurrentPutRemoveTest.class);
+ private List<SeparateThread> threads;
+
+
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
cache = DefaultCacheFactory.getInstance().createCache(false);
cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
cache.getConfiguration().setIsolationLevel(IsolationLevel.READ_COMMITTED);
cache.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- cache.getConfiguration().setLockAcquisitionTimeout(10000);
+ cache.getConfiguration().setLockAcquisitionTimeout(1000);
cache.start();
tm = cache.getConfiguration().getRuntimeConfig().getTransactionManager();
- }
+ threads = new ArrayList<SeparateThread>();
+ }
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception
{
- cache.destroy();
- }
+ TestingUtil.killCaches(cache);
+ for (SeparateThread st : threads)
+ {
+ st.interrupt();
+ st.join();
+ }
+ }
+ @Test (invocationCount = 20)
public void testLock() throws Exception {
- List<SeparateThread> threads = new ArrayList<SeparateThread>();
for (int x = 0; x < 2; x++) {
SeparateThread t = new SeparateThread(x);
threads.add(t);
@@ -72,16 +84,16 @@
Thread.currentThread().setName("Thread:" + num);
try {
for (int x = 0; x < 1000; x++) {
- tm.begin();
- System.out.println("R" + Thread.currentThread().getName());
+ tm.begin();
+ log.warn("Before Remove ("+x+")");
//inside transaction
cache.removeNode(Fqn.fromString("/a"));
- System.out.println("AR" + Thread.currentThread().getName());
+ log.warn("After Remove ("+x+")");
tm.commit();
//outside transaction
- System.out.println("P" + Thread.currentThread().getName());
+ log.warn("Before Put ("+x+")");
cache.put(Fqn.fromString("/a/b/c/d"), "text"+x,"b");
- System.out.println("AP" + Thread.currentThread().getName());
+ log.warn("After Put ("+x+")");
}
} catch (Exception e) {
this.e = e;
Show replies by date