Author: manik.surtani(a)jboss.com
Date: 2008-03-17 14:29:37 -0400 (Mon, 17 Mar 2008)
New Revision: 5441
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java
Log:
JBCACHE-1309 - Unguarded adding of nodes to workspace
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2008-03-17
17:21:12 UTC (rev 5440)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2008-03-17
18:29:37 UTC (rev 5441)
@@ -11,6 +11,7 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
+import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.optimistic.DataVersion;
@@ -39,6 +40,7 @@
* NodeSPI objects in the underlying data structure.
*/
private NodeFactory nodeFactory;
+ private long lockAcquisitionTimeout;
public OptimisticCreateIfNotExistsInterceptor()
{
@@ -46,9 +48,10 @@
}
@Inject
- private void injectDependencies(NodeFactory nodeFactory)
+ private void injectDependencies(NodeFactory nodeFactory, Configuration cfg)
{
this.nodeFactory = nodeFactory;
+ this.lockAcquisitionTimeout = cfg.getLockAcquisitionTimeout();
}
@Override
@@ -111,7 +114,7 @@
protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
{
List<Fqn> fqns = new ArrayList<Fqn>();
- fqns.add((Fqn) to);
+ fqns.add(to);
// peek into Node and get a hold of all child fqns as these need to be in the
workspace.
NodeSPI node = peekNode(ctx, (Fqn) from, false, true, true);
@@ -165,7 +168,8 @@
if (workspaceNode == null)
{
NodeSPI node = cache.getRoot();
- workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
+ workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, node, workspace, gtx,
lockAcquisitionTimeout);
+ //workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
workspace.addNode(workspaceNode);
log.debug("Created root node in workspace.");
}
@@ -227,7 +231,8 @@
NodeSPI newUnderlyingChildNode = workspaceNode.createChild(childName,
workspaceNode.getNode(), cache, versionToPassIn);
// now assign "workspaceNode" to the new child created.
- workspaceNode = nodeFactory.createWorkspaceNode(newUnderlyingChildNode,
workspace);
+// workspaceNode = nodeFactory.createWorkspaceNode(newUnderlyingChildNode,
workspace);
+ workspaceNode = lockAndCreateWorkspaceNode(nodeFactory,
newUnderlyingChildNode, workspace, gtx, lockAcquisitionTimeout);
workspaceNode.setVersioningImplicit(versionToPassIn == null ||
!isTargetFqn);
if (trace)
log.trace("setting versioning of " + workspaceNode.getFqn() +
" to be " + (workspaceNode.isVersioningImplicit() ? "implicit" :
"explicit"));
@@ -249,7 +254,8 @@
if (trace)
log.trace("Child node " + currentNode.getFqn() + "
doesn't exist in workspace or has been deleted. Adding to workspace in gtx " +
gtx);
- workspaceNode = nodeFactory.createWorkspaceNode(currentNode, workspace);
+// workspaceNode = nodeFactory.createWorkspaceNode(currentNode,
workspace);
+ workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, currentNode,
workspace, gtx, lockAcquisitionTimeout);
// if the underlying node is a tombstone then mark the workspace node as
newly created
if (!currentNode.isValid()) workspaceNode.markAsCreated();
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java 2008-03-17
17:21:12 UTC (rev 5440)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java 2008-03-17
18:29:37 UTC (rev 5441)
@@ -9,8 +9,10 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
@@ -102,5 +104,23 @@
nodeToUndelete.markAsResurrected(true);
}
+ protected WorkspaceNode lockAndCreateWorkspaceNode(NodeFactory nodeFactory, NodeSPI
node, TransactionWorkspace workspace, GlobalTransaction gtx, long timeout)
+ {
+ boolean locked = false;
+ try
+ {
+ locked = node.getLock().acquireWriteLock(gtx, timeout);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ if (!locked)
+ throw new TimeoutException("Unable to lock node " + node.getFqn() +
" after timeout " + timeout + " for copying into workspace");
+ WorkspaceNode wn = nodeFactory.createWorkspaceNode(node, workspace);
+ node.getLock().release(gtx);
+ return wn;
+ }
+
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2008-03-17
17:21:12 UTC (rev 5440)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2008-03-17
18:29:37 UTC (rev 5441)
@@ -7,6 +7,7 @@
package org.jboss.cache.interceptors;
import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeFactory;
@@ -43,6 +44,7 @@
*/
private NodeFactory nodeFactory;
private Notifier notifier;
+ private long lockAcquisitionTimeout;
@Inject
protected void injectDependencies(Notifier notifier, NodeFactory nodeFactory)
@@ -51,6 +53,13 @@
this.nodeFactory = nodeFactory;
}
+ @Override
+ public void setCache(CacheSPI cache)
+ {
+ super.setCache(cache);
+ lockAcquisitionTimeout = cache.getConfiguration().getLockAcquisitionTimeout();
+ }
+
public OptimisticNodeInterceptor()
{
initLogger();
@@ -529,10 +538,9 @@
{
NodeSPI node = peekNode(ctx, fqn, false, true, includeInvalidNodes);
if (node == null) return null;
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ workspaceNode = lockAndCreateWorkspaceNode(nodeFactory, node, workspace, gtx,
lockAcquisitionTimeout);
- // create new workspace node based on the node from the underlying data
structure
- workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
-
// and add the node to the workspace.
workspace.addNode(workspaceNode);
}
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java 2008-03-17
17:21:12 UTC (rev 5440)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java 2008-03-17
18:29:37 UTC (rev 5441)
@@ -236,11 +236,10 @@
return cache;
}
- protected Random random;
+ protected Random random = new Random();
protected void randomSleep(int min, int max)
{
- if (random == null) random = new Random();
long l = -1;
while (l < min) l = random.nextInt(max);
TestingUtil.sleepThread(l);
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java 2008-03-17
17:21:12 UTC (rev 5440)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/ThreadedCacheAccessTest.java 2008-03-17
18:29:37 UTC (rev 5441)
@@ -9,8 +9,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import static org.testng.AssertJUnit.assertTrue;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -26,20 +24,18 @@
{
private static final Log log = LogFactory.getLog(ThreadedCacheAccessTest.class);
// 5 concurrent threads.
- private final int numThreads = 5;
+ private final int numThreads = 10;
// how many times each thread loops
- private final int numLoopsPerThread = 25;
+ private final int numLoopsPerThread = 1000;
// write frequency. 1 in writeFrequency loops will do a put().
private final int writeFrequency = 5;
- // random sleep params (ms)
- private final int minSleep = 0, maxSleep = 100;
- private final Fqn fqn = Fqn.fromString("/a/b");
private final String key = "key", value = "value";
private CacheSPI cache;
private WorkerThread[] threads;
+ @Override
@AfterMethod(alwaysRun = true)
public void tearDown()
{
@@ -62,34 +58,38 @@
threads[i].start();
}
- for (int i = 0; i < numThreads; i++)
+ for (WorkerThread t : threads)
{
- threads[i].join();
+ t.join();
+ if (t.e != null) throw t.e;
}
-
- // test results.
- for (int i = 0; i < numThreads; i++)
- {
- assertTrue("Thread threw an exception!", threads[i].success);
- }
}
public class WorkerThread extends Thread
{
- public boolean success = true;
+ Exception e = null;
+ public WorkerThread()
+ {
+ setDaemon(true);
+ }
+
+ @Override
public void run()
{
log.debug(getName() + " starting up ... ");
+
for (int j = 0; j < numLoopsPerThread; j++)
{
TransactionManager tm = cache.getTransactionManager();
+ boolean write = j % writeFrequency == 0;
+
try
{
tm.begin();
// read something from the cache - it should be in it's own thread.
cache.get(fqn, key);
- if (j % writeFrequency == 0)
+ if (write)
{
cache.put(fqn, key, value + j);
}
@@ -97,20 +97,21 @@
}
catch (Exception e)
{
- log.error("Caught Exception!", e);
- assertTrue("Caught Exception!", false);
- success = false;
+ if (!write) // writes could fail from a perfectly acceptable data
versioning exception
+ {
+ this.e = e;
+ }
+
try
{
- tm.rollback();
+ if (tm.getTransaction() != null) tm.rollback();
}
catch (Exception e2)
{
log.error("Rollback failed!", e2);
}
- break;
+ if (!write) break;
}
- randomSleep(minSleep, maxSleep);
}
}
}