Author: manik.surtani(a)jboss.com
Date: 2008-07-07 19:39:38 -0400 (Mon, 07 Jul 2008)
New Revision: 6202
Added:
core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
Modified:
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
Log:
Abstracted MVCC locking and wrapping logic into separate helper
Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-07-07
23:13:35 UTC (rev 6201)
+++
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-07-07
23:39:38 UTC (rev 6202)
@@ -10,6 +10,7 @@
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import org.jboss.cache.transaction.TransactionTable;
@@ -20,7 +21,7 @@
* @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @since 2.1.0
*/
-@DefaultFactoryFor(classes = {Notifier.class,
+@DefaultFactoryFor(classes = {Notifier.class, MVCCNodeHelper.class,
ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class,
InvocationContextContainer.class,
CacheInvocationDelegate.class, TransactionTable.class, DataContainer.class,
LockStrategyFactory.class, BuddyFqnTransformer.class})
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java 2008-07-07
23:13:35 UTC (rev 6201)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java 2008-07-07
23:39:38 UTC (rev 6202)
@@ -1,11 +1,8 @@
package org.jboss.cache.interceptors;
-import org.jboss.cache.CacheException;
import org.jboss.cache.DataContainer;
import org.jboss.cache.Fqn;
-import org.jboss.cache.InternalNode;
import org.jboss.cache.NodeFactory;
-import org.jboss.cache.NodeSPI;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.read.ExistsCommand;
import org.jboss.cache.commands.read.GetChildrenNamesCommand;
@@ -30,17 +27,11 @@
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.interceptors.base.PrePostProcessingCommandInterceptor;
import org.jboss.cache.invocation.InvocationContext;
-import org.jboss.cache.invocation.NodeInvocationDelegate;
import org.jboss.cache.lock.LockManager;
-import static org.jboss.cache.lock.LockType.WRITE;
-import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.mvcc.NullMarkerNode;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
import org.jboss.cache.mvcc.ReadCommittedNode;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
/**
* Interceptor to implement <a
href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a>
functionality.
@@ -55,35 +46,23 @@
LockManager lockManager;
DataContainer dataContainer;
NodeFactory nodeFactory;
- private long defaultLockAcquisitionTimeout;
- private boolean lockParentForChildInsertRemove;
+ MVCCNodeHelper helper;
@Inject
- public void setDependencies(LockManager lockManager, DataContainer dataContainer,
NodeFactory nodeFactory)
+ public void setDependencies(LockManager lockManager, DataContainer dataContainer,
NodeFactory nodeFactory, MVCCNodeHelper helper)
{
this.lockManager = lockManager;
this.dataContainer = dataContainer;
this.nodeFactory = nodeFactory;
+ this.helper = helper;
}
@Start
public void start()
{
allowWriteSkew = configuration.isAllowWriteSkew();
- defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
- lockParentForChildInsertRemove = configuration.isLockParentForChildInsertRemove();
}
- private boolean parentLockNeeded(NodeSPI parent)
- {
- return lockParentForChildInsertRemove || (parent != null &&
parent.isLockForChildInsertRemove());
- }
-
- private boolean parentLockNeeded(Fqn parent)
- {
- return parentLockNeeded(dataContainer.peek(parent, true, true));
- }
-
@Override
protected boolean doBeforeCall(InvocationContext ctx, VisitableCommand command)
{
@@ -97,131 +76,35 @@
@Override
public Object handlePutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick
it in the context.
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false);
// get the node and stick it in the context.
return invokeNextInterceptor(ctx, command);
}
@Override
public Object handlePutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick
it in the context.
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false);
// get the node and stick it in the context.
return invokeNextInterceptor(ctx, command);
}
@Override
public Object handlePutForExternalReadCommand(InvocationContext ctx,
PutForExternalReadCommand command) throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick
it in the context.
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false);
// get the node and stick it in the context.
return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
{
- addNodeAndParentForRemoval(ctx, command.getFqn(), true);
+ helper.wrapNodesForWriting(ctx, command.getFqn());
return invokeNextInterceptor(ctx, command);
}
- private List<Fqn> addNodeAndParentForRemoval(InvocationContext ctx, Fqn nodeFqn,
boolean recursive) throws InterruptedException
- {
- // when removing a node we want to get a lock on the Fqn anyway and return the
wrapped node.
- if (nodeFqn.isRoot()) throw new CacheException("Attempting to remove
Fqn.ROOT!");
-
- Fqn parentFqn = nodeFqn.getParent();
- // inspect parent
- boolean needToCopyParent = false;
- boolean parentLockNeeded = parentLockNeeded(parentFqn);
- if (parentLockNeeded)
- {
- needToCopyParent = lock(ctx, parentFqn);
- // Ensure the node is in the context.
- putNodeInContext(ctx, parentFqn, needToCopyParent);
- }
-
- boolean needToCopyNode = lock(ctx, nodeFqn);
-
- // Ensure the node is in the context.
- putNodeInContext(ctx, nodeFqn, needToCopyNode);
-
- ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(nodeFqn);
-
- // update child ref on parent to point to child as this is now a copy.
- if (node != null && !(node instanceof NullMarkerNode))
- {
- if (parentLockNeeded && (needToCopyNode || needToCopyParent))
- {
- ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(parentFqn);
- parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode)
node.getDelegationTarget()));
- }
-
- // now deal with children.
- if (recursive)
- {
- Map childMap = node.getChildrenMapDirect();
- List<Fqn> fqnsToBeRemoved = new LinkedList<Fqn>();
- fqnsToBeRemoved.add(nodeFqn);
- if (childMap == null || childMap.isEmpty()) return fqnsToBeRemoved;
-
- for (Object n : childMap.values())
- {
- NodeSPI child = (NodeSPI) n;
- lockForRemoval(child.getFqn(), recursive, ctx, fqnsToBeRemoved);
- }
-
- return fqnsToBeRemoved;
- }
- }
-
- return null;
- }
-
- private void lockForRemoval(Fqn fqn, boolean isRecursive, InvocationContext ctx,
List<Fqn> fqnList) throws InterruptedException
- {
- lock(ctx, fqn); // lock node
- if (fqnList != null) fqnList.add(fqn);
-
- // now wrap and add to the context
- ReadCommittedNode rcn = (ReadCommittedNode) getWrappedNode(ctx, fqn, true, false,
true);
- if (rcn != null)
- {
- rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
- ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(fqn.getParent());
- parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode)
rcn.getDelegationTarget()));
-
- if (isRecursive)
- {
- Map<Object, NodeSPI> children = rcn.getChildrenMapDirect();
- if (children != null)
- {
- for (NodeSPI child : children.values())
- {
- lockForRemoval(child.getFqn(), isRecursive, ctx, fqnList);
- }
- }
- }
- }
- }
-
- private void putNodeInContext(InvocationContext ctx, Fqn fqn, boolean needToCopyNode)
- {
- ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
- if (node == null)
- {
- InternalNode in = dataContainer.peekInternalNode(fqn, false);
- node = nodeFactory.createMvccNode(in);
- ctx.putLookedUpNode(fqn, node);
- }
-
- if (needToCopyNode && node != null && !node.isChanged()) // node
could be null if using read-committed
- {
- node.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
- }
- }
-
@Override
public Object handleClearDataCommand(InvocationContext ctx, ClearDataCommand command)
throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, false, false);
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, false,
false);
return invokeNextInterceptor(ctx, command);
}
@@ -230,12 +113,15 @@
{
// set lock acquisition timeout to 0 - we need to fail fast.
ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
- List<Fqn> fqnsToEvict = addNodeAndParentForRemoval(ctx, command.getFqn(),
command.isRecursive());
-
- if (fqnsToEvict != null) // add this set to the command
+ if (command.isRecursive())
{
+ List<Fqn> fqnsToEvict = helper.wrapNodesForWriting(ctx,
command.getFqn());
command.setNodesToEvict(fqnsToEvict);
}
+ else
+ {
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, true,
true);
+ }
return invokeNextInterceptor(ctx, command);
}
@@ -245,52 +131,57 @@
{
// this should be handled the same as a recursive evict command.
ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
- addNodeAndParentForRemoval(ctx, command.getFqn(), true);
-
+ helper.wrapNodesForWriting(ctx, command.getFqn());
return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command)
throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, false, false);
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, false,
false);
return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleGetDataMapCommand(InvocationContext ctx, GetDataMapCommand
command) throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleExistsNodeCommand(InvocationContext ctx, ExistsCommand command)
throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
command) throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleGetNodeCommand(InvocationContext ctx, GetNodeCommand command)
throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleGetKeysCommand(InvocationContext ctx, GetKeysCommand command)
throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleGetChildrenNamesCommand(InvocationContext ctx,
GetChildrenNamesCommand command) throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
@@ -298,13 +189,13 @@
{
// nodes we need to get WLs for:
// node we are moving FROM (and it's parent and children.) Same as
removeNode.
- List<Fqn> nodeAndChildren = addNodeAndParentForRemoval(ctx, command.getFqn(),
true);
+ List<Fqn> nodeAndChildren = helper.wrapNodesForWriting(ctx,
command.getFqn());
Fqn newParent = command.getTo();
Fqn oldParent = command.getFqn().getParent();
// now lock the new parent.
- getWrappedNode(ctx, newParent, true, true, false);
+ helper.wrapNodeForWriting(ctx, newParent, true, true, false, false, false);
if (!oldParent.equals(newParent) && nodeAndChildren != null)
{
@@ -313,7 +204,7 @@
for (Fqn f : nodeAndChildren)
{
Fqn newChildFqn = f.replaceAncestor(oldParent, newParent);
- getWrappedNode(ctx, newChildFqn, true, true, true);
+ helper.wrapNodeForWriting(ctx, newChildFqn, true, true, true, false, false);
}
}
@@ -324,13 +215,14 @@
@Override
public Object handleGravitateDataCommand(InvocationContext ctx, GravitateDataCommand
command) throws Throwable
{
- return handleReadCommand(ctx, command,
Collections.singletonList(command.getFqn()));
+ helper.wrapNodeForReading(ctx, command.getFqn());
+ return invokeNextInterceptor(ctx, command);
}
@Override
public Object handleCreateNodeCommand(InvocationContext ctx, CreateNodeCommand
command) throws Throwable
{
- getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick
it in the context.
+ helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false);
// get the node and stick it in the context.
return invokeNextInterceptor(ctx, command);
}
@@ -439,148 +331,4 @@
throw new IllegalStateException("Attempting to do a commit or rollback but
there is no transactional context in scope. " + ctx);
}
}
-
- // ----------------- actual implementation details ----------------------------
-
- protected Object handleReadCommand(InvocationContext ctx, VisitableCommand command,
List<Fqn> fqns) throws Throwable
- {
- boolean forceWriteLock = ctx.getOptionOverrides().isForceWriteLock();
-
- // does the node exist in the context?
- for (Fqn f : fqns)
- {
- if (forceWriteLock)
- {
- if (trace) log.trace("Forcing lock on reading node " + f);
- getWrappedNode(ctx, f, true, false, false);
- }
- else if (ctx.lookUpNode(f) == null)
- {
- if (trace) log.trace("Node " + f + " is not in context,
fetching from container.");
- // simple implementation. Peek the node, wrap it, put wrapped node in the
context.
- InternalNode node = dataContainer.peekInternalNode(f, false);
- NodeSPI wrapped = nodeFactory.createMvccNode(node);
- if (wrapped != null) ctx.putLookedUpNode(f, wrapped);
- }
- else
- {
- if (trace) log.trace("Node " + f + " is already in
context.");
- }
- }
- return invokeNextInterceptor(ctx, command);
- }
-
- /**
- * First checks in contexts for the existence of the node. If it does exist, it will
return it, acquiring a lock if
- * necessary. Otherwise, it will peek in the dataContainer, wrap the node, lock if
necessary, and add it to the context.
- * If it doesn't even exist in the dataContainer and createIfAbsent is true, it
will create a new node and add it to the
- * data structure. It will lock the node, and potentially the parent as well, if
necessary. If the parent is locked,
- * it too will be added to the context if it wasn't there already.
- *
- * @param fqn to retrieve
- * @param lockForWriting if true, a lock will be acquired.
- * @param createIfAbsent if true, will be created if absent.
- * @param includeInvalidNodes
- * @return a NodeSPI or null.
- */
- protected NodeSPI getWrappedNode(InvocationContext context, Fqn fqn, boolean
lockForWriting, boolean createIfAbsent, boolean includeInvalidNodes) throws
InterruptedException
- {
- ReadCommittedNode n = (ReadCommittedNode) context.lookUpNode(fqn);
- if (createIfAbsent && n != null && n.isNullNode()) n = null;
- if (n != null)
- {
- // acquire lock if needed
- if (lockForWriting && lock(context, fqn))
- {
- // create a copy of the underlying node
- n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
- }
- if (trace) log.trace("Retrieving wrapped node " + fqn);
- if (n.isDeleted() && createIfAbsent)
- {
- if (trace) log.trace("Node is deleted in current scope. Need to
un-delete.");
- n.markAsDeleted(false);
- n.setValid(true, false);
- }
- return n;
- }
-
- // else, fetch from dataContainer.
- InternalNode in = dataContainer.peekInternalNode(fqn, includeInvalidNodes);
- if (in != null)
- {
- // do we need a lock?
- boolean needToCopy = false;
- if (lockForWriting && lock(context, fqn))
- {
- needToCopy = true;
- }
- ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
- context.putLookedUpNode(fqn, wrapped);
- if (needToCopy)
- wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context,
nodeFactory);
- return wrapped;
- }
-
- // else, do we need to create one?
- if (createIfAbsent)
- {
- Fqn parentFqn = fqn.getParent();
- NodeSPI parent = getWrappedNode(context, parentFqn, false, createIfAbsent,
false);
- // do we need to lock the parent to create children?
- boolean parentLockNeeded = parentLockNeeded(parent);
- // get a lock on the parent.
- if (parentLockNeeded && lock(context, parentFqn))
- {
- ReadCommittedNode parentRCN = (ReadCommittedNode)
context.lookUpNode(parentFqn);
- parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew, context,
nodeFactory);
- }
-
- // now to lock and create the node.
- lock(context, fqn);
-
- NodeSPI temp = parent.getOrCreateChild(fqn.getLastElement(),
context.getGlobalTransaction());
- // TODO: warning, hack! There is a race condition here. Add a way to create
nodes without attaching to a parent.
- parent.removeChildDirect(fqn.getLastElement());
-
- in = (InternalNode) ((NodeInvocationDelegate) temp).getDelegationTarget();
- ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
- wrapped.setCreated(true);
- context.putLookedUpNode(fqn, wrapped);
- wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
-// if (parentLockNeeded)
-// {
-// // since we copied the child make sure we update the parent's ref
-//
parent.addChildDirect(nodeFactory.createNodeInvocationDelegate(wrapped.getNode()));
-// }
-
- return wrapped;
- }
-
- return null;
- }
-
- /**
- * Attempts to lock a node if the lock isn't already held in the current scope.
- *
- * @param ctx context
- * @param fqn Fqn to lock
- * @return true if a lock was needed and acquired, false if it didn't need to
acquire the lock (i.e., lock was already held)
- * @throws InterruptedException
- * @throws TimeoutException if we are unable to acquire the lock after a specified
timeout.
- */
- protected boolean lock(InvocationContext ctx, Fqn fqn) throws InterruptedException
- {
- // don't EVER use lockManager.isLocked() since with lock striping it may be the
case that we hold the relevant
- // lock which may be shared with another Fqn that we have a lock for already.
- // nothing wrong, just means that we fail to record the lock. And that is a
problem.
- // Better to check our records and lock again if necessary.
- if (!ctx.getLocks().contains(fqn))
- {
- if (!lockManager.lockAndRecord(fqn, WRITE, ctx))
- throw new TimeoutException("Unable to acquire lock on Fqn [" + fqn
+ "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) +
"] milliseconds!");
- return true;
- }
- return false;
- }
}
Added: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2008-07-07 23:39:38
UTC (rev 6202)
@@ -0,0 +1,345 @@
+package org.jboss.cache.mvcc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InternalNode;
+import org.jboss.cache.NodeFactory;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.NonVolatile;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.invocation.NodeInvocationDelegate;
+import org.jboss.cache.lock.LockManager;
+import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.lock.TimeoutException;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility functions to manipulate wrapping {@link org.jboss.cache.InternalNode}s as
{@link org.jboss.cache.mvcc.ReadCommittedNode}
+ * or {@link org.jboss.cache.mvcc.RepeatableReadNode}s. Would also entail locking, if
necessary.
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 3.0
+ */
+@NonVolatile
+public class MVCCNodeHelper
+{
+ DataContainer dataContainer;
+ NodeFactory nodeFactory;
+ private static final Log log = LogFactory.getLog(MVCCNodeHelper.class);
+ private static final boolean trace = log.isTraceEnabled();
+ private long defaultLockAcquisitionTimeout;
+ private LockManager lockManager;
+ private Configuration configuration;
+ private boolean allowWriteSkew;
+ private boolean lockParentForChildInsertRemove;
+
+ @Inject
+ public void injectDependencies(DataContainer dataContainer, NodeFactory nodeFactory,
LockManager lockManager, Configuration configuration)
+ {
+ this.nodeFactory = nodeFactory;
+ this.dataContainer = dataContainer;
+ this.configuration = configuration;
+ this.lockManager = lockManager;
+ }
+
+ @Start
+ public void start()
+ {
+ defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
+ allowWriteSkew = configuration.isAllowWriteSkew();
+ lockParentForChildInsertRemove = configuration.isLockParentForChildInsertRemove();
+ }
+
+
+ /**
+ * Attempts to provide the context with a set of wrapped nodes based on the Collection
of fqns passed in. If the nodes
+ * already exist in the context then the node is not wrapped again.
+ * <p/>
+ * {@link InternalNode}s are wrapped using {@link
NodeFactory#createMvccNode(org.jboss.cache.InternalNode)} and as such,
+ * null internal nodes are treated according to isolation level used. See {@link
NodeFactory#createMvccNode(org.jboss.cache.InternalNode)}
+ * for details on this behaviour.
+ * <p/>
+ * Note that if the context has the {@link
org.jboss.cache.config.Option#isForceWriteLock()} option set, then write locks are
+ * acquired and the node is copied.
+ * <p/>
+ *
+ * @param ctx current invocation context
+ * @param fqns collection of Fqns. Should not be null.
+ * @throws InterruptedException if write locks are forced and the lock manager is
interrupted.
+ */
+ public void wrapNodesForReading(InvocationContext ctx, Collection<Fqn> fqns)
throws InterruptedException
+ {
+ boolean forceWriteLock = ctx.getOptionOverrides().isForceWriteLock();
+
+ // does the node exist in the context?
+ for (Fqn f : fqns) wrapNodeForReading(ctx, f, forceWriteLock);
+ }
+
+ /**
+ * Similar to {@link
#wrapNodesForReading(org.jboss.cache.invocation.InvocationContext, java.util.Collection)}
except
+ * that this version takes a single Fqn parameter to wrap a single node.
+ *
+ * @param ctx current invocation context
+ * @param fqn fqn to fetch and wrap
+ * @throws InterruptedException if write locks are forced and the lock manager is
interrupted.
+ */
+ public void wrapNodeForReading(InvocationContext ctx, Fqn fqn) throws
InterruptedException
+ {
+ wrapNodeForReading(ctx, fqn, ctx.getOptionOverrides().isForceWriteLock());
+ }
+
+ private void wrapNodeForReading(InvocationContext ctx, Fqn f, boolean writeLockForced)
throws InterruptedException
+ {
+ if (writeLockForced)
+ {
+ if (trace) log.trace("Forcing lock on reading node " + f);
+ wrapNodeForWriting(ctx, f, true, false, false, false, false);
+ }
+ else if (ctx.lookUpNode(f) == null)
+ {
+ if (trace) log.trace("Node " + f + " is not in context, fetching
from container.");
+ // simple implementation. Peek the node, wrap it, put wrapped node in the
context.
+ InternalNode node = dataContainer.peekInternalNode(f, false);
+ ReadCommittedNode wrapped = nodeFactory.createMvccNode(node);
+ if (wrapped != null) ctx.putLookedUpNode(f, wrapped);
+ }
+ else
+ {
+ if (trace) log.trace("Node " + f + " is already in
context.");
+ }
+ }
+
+ /**
+ * Attempts to lock a node if the lock isn't already held in the current scope,
and records the lock in the context.
+ *
+ * @param ctx context
+ * @param fqn Fqn to lock
+ * @return true if a lock was needed and acquired, false if it didn't need to
acquire the lock (i.e., lock was already held)
+ * @throws InterruptedException if interrupted
+ * @throws TimeoutException if we are unable to acquire the lock after a specified
timeout.
+ */
+ private boolean acquireLock(InvocationContext ctx, Fqn fqn) throws
InterruptedException, TimeoutException
+ {
+ // don't EVER use lockManager.isLocked() since with lock striping it may be the
case that we hold the relevant
+ // lock which may be shared with another Fqn that we have a lock for already.
+ // nothing wrong, just means that we fail to record the lock. And that is a
problem.
+ // Better to check our records and lock again if necessary.
+ if (!ctx.getLocks().contains(fqn))
+ {
+ if (!lockManager.lockAndRecord(fqn, WRITE, ctx))
+ throw new TimeoutException("Unable to acquire lock on Fqn [" + fqn
+ "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) +
"] milliseconds!");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * First checks in contexts for the existence of the node. If it does exist, it will
return it, acquiring a lock if
+ * necessary. Otherwise, it will peek in the dataContainer, wrap the node, lock if
necessary, and add it to the context.
+ * If it doesn't even exist in the dataContainer and createIfAbsent is true, it
will create a new node and add it to the
+ * data structure. It will lock the node, and potentially the parent as well, if
necessary. If the parent is locked,
+ * it too will be added to the context if it wasn't there already.
+ *
+ * @param fqn to retrieve
+ * @param lockForWriting if true, a lock will be acquired.
+ * @param createIfAbsent if true, will be created if absent.
+ * @param includeInvalidNodes if true, invalid nodes are included.
+ * @param forRemoval if true, the parent may also be locked if locking
parents for removal is necessary.
+ * @param force if true, will force the write lock even if the node is
null.
+ * @return a wrapped node, or null.
+ */
+ public ReadCommittedNode wrapNodeForWriting(InvocationContext context, Fqn fqn,
boolean lockForWriting, boolean createIfAbsent, boolean includeInvalidNodes, boolean
forRemoval, boolean force) throws InterruptedException
+ {
+ ReadCommittedNode n = (ReadCommittedNode) context.lookUpNode(fqn);
+ if (createIfAbsent && n != null && n.isNullNode()) n = null;
+ if (n != null)
+ {
+ // acquire lock if needed
+ if (lockForWriting && acquireLock(context, fqn))
+ {
+ // create a copy of the underlying node
+ n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+ }
+ if (trace) log.trace("Retrieving wrapped node " + fqn);
+ if (n.isDeleted() && createIfAbsent)
+ {
+ if (trace) log.trace("Node is deleted in current scope. Need to
un-delete.");
+ n.markAsDeleted(false);
+ n.setValid(true, false);
+ }
+
+ }
+ else
+ {
+ // else, fetch from dataContainer.
+ InternalNode in = dataContainer.peekInternalNode(fqn, includeInvalidNodes);
+ if (in != null)
+ {
+ // do we need a lock?
+ boolean needToCopy = false;
+ if (lockForWriting && acquireLock(context, fqn))
+ {
+ needToCopy = true;
+ }
+ n = nodeFactory.createMvccNode(in);
+ context.putLookedUpNode(fqn, n);
+ if (needToCopy) n.copyNodeForUpdate(dataContainer, allowWriteSkew, context,
nodeFactory);
+ }
+ else if (createIfAbsent) // else, do we need to create one?
+ {
+ Fqn parentFqn = fqn.getParent();
+ NodeSPI parent = wrapNodeForWriting(context, parentFqn, false,
createIfAbsent, false, false, false);
+ // do we need to lock the parent to create children?
+ boolean parentLockNeeded = isParentLockNeeded(parent);
+ // get a lock on the parent.
+ if (parentLockNeeded && acquireLock(context, parentFqn))
+ {
+ ReadCommittedNode parentRCN = (ReadCommittedNode)
context.lookUpNode(parentFqn);
+ parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew, context,
nodeFactory);
+ }
+
+ // now to lock and create the node.
+ acquireLock(context, fqn);
+
+ NodeSPI temp = parent.getOrCreateChild(fqn.getLastElement(),
context.getGlobalTransaction());
+ // TODO: warning, hack! There is a race condition here. Add a way to create
nodes without attaching to a parent.
+ parent.removeChildDirect(fqn.getLastElement());
+
+ in = (InternalNode) ((NodeInvocationDelegate) temp).getDelegationTarget();
+ n = nodeFactory.createMvccNode(in);
+ n.setCreated(true);
+ context.putLookedUpNode(fqn, n);
+ n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+ }
+ }
+
+ // see if we need to force the lock on nonexistent nodes.
+ if (n == null && force) acquireLock(context, fqn);
+
+ // now test if we need to lock the parent as well.
+ if ((n != null || force) && forRemoval &&
isParentLockNeeded(fqn.getParent(), context))
+ wrapNodeForWriting(context, fqn.getParent(), true, false, includeInvalidNodes,
false, force);
+
+ return n;
+ }
+
+ /**
+ * Wraps a node and all its subnodes and adds them to the context, acquiring write
locks for them all.
+ *
+ * @param ctx context
+ * @param fqn fqn to wrap
+ * @return a list of Fqns of locks acquired in this call.
+ * @throws InterruptedException if the lock manager is interrupted.
+ */
+ public List<Fqn> wrapNodesForWriting(InvocationContext ctx, Fqn fqn) throws
InterruptedException
+ {
+ // when removing a node we want to get a lock on the Fqn anyway and return the
wrapped node.
+ if (fqn.isRoot()) throw new CacheException("Attempting to remove
Fqn.ROOT!");
+
+ Fqn parentFqn = fqn.getParent();
+ // inspect parent
+ boolean needToCopyParent = false;
+ boolean parentLockNeeded = isParentLockNeeded(parentFqn, ctx);
+ if (parentLockNeeded)
+ {
+ needToCopyParent = acquireLock(ctx, parentFqn);
+ // Ensure the node is in the context.
+ putNodeInContext(ctx, parentFqn, needToCopyParent);
+ }
+
+ boolean needToCopyNode = acquireLock(ctx, fqn);
+
+ // Ensure the node is in the context.
+ putNodeInContext(ctx, fqn, needToCopyNode);
+
+ ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
+
+ // update child ref on parent to point to child as this is now a copy.
+ if (node != null && !(node instanceof NullMarkerNode))
+ {
+ if (parentLockNeeded && (needToCopyNode || needToCopyParent))
+ {
+ ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(parentFqn);
+ parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode)
node.getDelegationTarget()));
+ }
+
+ // now deal with children.
+ Map childMap = node.getChildrenMapDirect();
+ List<Fqn> fqnsToBeRemoved = new LinkedList<Fqn>();
+ fqnsToBeRemoved.add(fqn);
+ if (childMap == null || childMap.isEmpty()) return fqnsToBeRemoved;
+
+ for (Object n : childMap.values())
+ {
+ NodeSPI child = (NodeSPI) n;
+ lockForWritingRecursive(child.getFqn(), ctx, fqnsToBeRemoved);
+ }
+
+ return fqnsToBeRemoved;
+ }
+
+ return null;
+ }
+
+ private void lockForWritingRecursive(Fqn fqn, InvocationContext ctx, List<Fqn>
fqnList) throws InterruptedException
+ {
+ acquireLock(ctx, fqn); // lock node
+ if (fqnList != null) fqnList.add(fqn);
+
+ // now wrap and add to the context
+ ReadCommittedNode rcn = wrapNodeForWriting(ctx, fqn, true, false, true, false,
false);
+ if (rcn != null)
+ {
+ rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
+ ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(fqn.getParent());
+ parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode)
rcn.getDelegationTarget()));
+
+ Map<Object, NodeSPI> children = rcn.getChildrenMapDirect();
+ if (children != null)
+ {
+ for (NodeSPI child : children.values())
+ {
+ lockForWritingRecursive(child.getFqn(), ctx, fqnList);
+ }
+ }
+ }
+ }
+
+ private void putNodeInContext(InvocationContext ctx, Fqn fqn, boolean needToCopyNode)
+ {
+ ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
+ if (node == null)
+ {
+ InternalNode in = dataContainer.peekInternalNode(fqn, false);
+ node = nodeFactory.createMvccNode(in);
+ ctx.putLookedUpNode(fqn, node);
+ }
+
+ if (needToCopyNode && node != null && !node.isChanged()) // node
could be null if using read-committed
+ {
+ node.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
+ }
+ }
+
+ private boolean isParentLockNeeded(NodeSPI parent)
+ {
+ return lockParentForChildInsertRemove || (parent != null &&
parent.isLockForChildInsertRemove());
+ }
+
+ private boolean isParentLockNeeded(Fqn parent, InvocationContext ctx)
+ {
+ NodeSPI parentNode = ctx.lookUpNode(parent);
+ if (parentNode == null) parentNode = dataContainer.peek(parent, true, true);
+ return isParentLockNeeded(parentNode);
+ }
+}