[jbosscache-commits] JBoss Cache SVN: r6202 - in core/trunk/src/main/java/org/jboss/cache: interceptors and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Jul 7 19:39:39 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-07-07 19:39:38 -0400 (Mon, 07 Jul 2008)
New Revision: 6202

Added:
   core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
Log:
Abstracted MVCC locking and wrapping logic into separate helper

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-07-07 23:13:35 UTC (rev 6201)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-07-07 23:39:38 UTC (rev 6202)
@@ -10,6 +10,7 @@
 import org.jboss.cache.lock.LockStrategyFactory;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
 import org.jboss.cache.transaction.TransactionTable;
@@ -20,7 +21,7 @@
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 2.1.0
  */
- at DefaultFactoryFor(classes = {Notifier.class,
+ at DefaultFactoryFor(classes = {Notifier.class, MVCCNodeHelper.class,
       ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
       CacheInvocationDelegate.class, TransactionTable.class, DataContainer.class,
       LockStrategyFactory.class, BuddyFqnTransformer.class})

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java	2008-07-07 23:13:35 UTC (rev 6201)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java	2008-07-07 23:39:38 UTC (rev 6202)
@@ -1,11 +1,8 @@
 package org.jboss.cache.interceptors;
 
-import org.jboss.cache.CacheException;
 import org.jboss.cache.DataContainer;
 import org.jboss.cache.Fqn;
-import org.jboss.cache.InternalNode;
 import org.jboss.cache.NodeFactory;
-import org.jboss.cache.NodeSPI;
 import org.jboss.cache.commands.VisitableCommand;
 import org.jboss.cache.commands.read.ExistsCommand;
 import org.jboss.cache.commands.read.GetChildrenNamesCommand;
@@ -30,17 +27,11 @@
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.interceptors.base.PrePostProcessingCommandInterceptor;
 import org.jboss.cache.invocation.InvocationContext;
-import org.jboss.cache.invocation.NodeInvocationDelegate;
 import org.jboss.cache.lock.LockManager;
-import static org.jboss.cache.lock.LockType.WRITE;
-import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.mvcc.NullMarkerNode;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
 import org.jboss.cache.mvcc.ReadCommittedNode;
 
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
@@ -55,35 +46,23 @@
    LockManager lockManager;
    DataContainer dataContainer;
    NodeFactory nodeFactory;
-   private long defaultLockAcquisitionTimeout;
-   private boolean lockParentForChildInsertRemove;
+   MVCCNodeHelper helper;
 
    @Inject
-   public void setDependencies(LockManager lockManager, DataContainer dataContainer, NodeFactory nodeFactory)
+   public void setDependencies(LockManager lockManager, DataContainer dataContainer, NodeFactory nodeFactory, MVCCNodeHelper helper)
    {
       this.lockManager = lockManager;
       this.dataContainer = dataContainer;
       this.nodeFactory = nodeFactory;
+      this.helper = helper;
    }
 
    @Start
    public void start()
    {
       allowWriteSkew = configuration.isAllowWriteSkew();
-      defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
-      lockParentForChildInsertRemove = configuration.isLockParentForChildInsertRemove();
    }
 
-   private boolean parentLockNeeded(NodeSPI parent)
-   {
-      return lockParentForChildInsertRemove || (parent != null && parent.isLockForChildInsertRemove());
-   }
-
-   private boolean parentLockNeeded(Fqn parent)
-   {
-      return parentLockNeeded(dataContainer.peek(parent, true, true));
-   }
-
    @Override
    protected boolean doBeforeCall(InvocationContext ctx, VisitableCommand command)
    {
@@ -97,131 +76,35 @@
    @Override
    public Object handlePutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick it in the context.
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false); // get the node and stick it in the context.
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handlePutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick it in the context.
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false); // get the node and stick it in the context.
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handlePutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick it in the context.
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false); // get the node and stick it in the context.
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
    {
-      addNodeAndParentForRemoval(ctx, command.getFqn(), true);
+      helper.wrapNodesForWriting(ctx, command.getFqn());
       return invokeNextInterceptor(ctx, command);
    }
 
-   private List<Fqn> addNodeAndParentForRemoval(InvocationContext ctx, Fqn nodeFqn, boolean recursive) throws InterruptedException
-   {
-      // when removing a node we want to get a lock on the Fqn anyway and return the wrapped node.
-      if (nodeFqn.isRoot()) throw new CacheException("Attempting to remove Fqn.ROOT!");
-
-      Fqn parentFqn = nodeFqn.getParent();
-      // inspect parent
-      boolean needToCopyParent = false;
-      boolean parentLockNeeded = parentLockNeeded(parentFqn);
-      if (parentLockNeeded)
-      {
-         needToCopyParent = lock(ctx, parentFqn);
-         // Ensure the node is in the context.
-         putNodeInContext(ctx, parentFqn, needToCopyParent);
-      }
-
-      boolean needToCopyNode = lock(ctx, nodeFqn);
-
-      // Ensure the node is in the context.
-      putNodeInContext(ctx, nodeFqn, needToCopyNode);
-
-      ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(nodeFqn);
-
-      // update child ref on parent to point to child as this is now a copy.
-      if (node != null && !(node instanceof NullMarkerNode))
-      {
-         if (parentLockNeeded && (needToCopyNode || needToCopyParent))
-         {
-            ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(parentFqn);
-            parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode) node.getDelegationTarget()));
-         }
-
-         // now deal with children.
-         if (recursive)
-         {
-            Map childMap = node.getChildrenMapDirect();
-            List<Fqn> fqnsToBeRemoved = new LinkedList<Fqn>();
-            fqnsToBeRemoved.add(nodeFqn);
-            if (childMap == null || childMap.isEmpty()) return fqnsToBeRemoved;
-
-            for (Object n : childMap.values())
-            {
-               NodeSPI child = (NodeSPI) n;
-               lockForRemoval(child.getFqn(), recursive, ctx, fqnsToBeRemoved);
-            }
-
-            return fqnsToBeRemoved;
-         }
-      }
-
-      return null;
-   }
-
-   private void lockForRemoval(Fqn fqn, boolean isRecursive, InvocationContext ctx, List<Fqn> fqnList) throws InterruptedException
-   {
-      lock(ctx, fqn); // lock node
-      if (fqnList != null) fqnList.add(fqn);
-
-      // now wrap and add to the context
-      ReadCommittedNode rcn = (ReadCommittedNode) getWrappedNode(ctx, fqn, true, false, true);
-      if (rcn != null)
-      {
-         rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
-         ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(fqn.getParent());
-         parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode) rcn.getDelegationTarget()));
-
-         if (isRecursive)
-         {
-            Map<Object, NodeSPI> children = rcn.getChildrenMapDirect();
-            if (children != null)
-            {
-               for (NodeSPI child : children.values())
-               {
-                  lockForRemoval(child.getFqn(), isRecursive, ctx, fqnList);
-               }
-            }
-         }
-      }
-   }
-
-   private void putNodeInContext(InvocationContext ctx, Fqn fqn, boolean needToCopyNode)
-   {
-      ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
-      if (node == null)
-      {
-         InternalNode in = dataContainer.peekInternalNode(fqn, false);
-         node = nodeFactory.createMvccNode(in);
-         ctx.putLookedUpNode(fqn, node);
-      }
-
-      if (needToCopyNode && node != null && !node.isChanged()) // node could be null if using read-committed
-      {
-         node.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
-      }
-   }
-
    @Override
    public Object handleClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, false, false);
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, false, false);
       return invokeNextInterceptor(ctx, command);
    }
 
@@ -230,12 +113,15 @@
    {
       // set lock acquisition timeout to 0 - we need to fail fast.
       ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
-      List<Fqn> fqnsToEvict = addNodeAndParentForRemoval(ctx, command.getFqn(), command.isRecursive());
-
-      if (fqnsToEvict != null) // add this set to the command
+      if (command.isRecursive())
       {
+         List<Fqn> fqnsToEvict = helper.wrapNodesForWriting(ctx, command.getFqn());
          command.setNodesToEvict(fqnsToEvict);
       }
+      else
+      {
+         helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, true, true);
+      }
 
       return invokeNextInterceptor(ctx, command);
    }
@@ -245,52 +131,57 @@
    {
       // this should be handled the same as a recursive evict command.
       ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
-      addNodeAndParentForRemoval(ctx, command.getFqn(), true);
-
+      helper.wrapNodesForWriting(ctx, command.getFqn());
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, false, false);
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, false, false, false, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleExistsNodeCommand(InvocationContext ctx, ExistsCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
@@ -298,13 +189,13 @@
    {
       // nodes we need to get WLs for:
       // node we are moving FROM (and it's parent and children.)  Same as removeNode.
-      List<Fqn> nodeAndChildren = addNodeAndParentForRemoval(ctx, command.getFqn(), true);
+      List<Fqn> nodeAndChildren = helper.wrapNodesForWriting(ctx, command.getFqn());
 
       Fqn newParent = command.getTo();
       Fqn oldParent = command.getFqn().getParent();
 
       // now lock the new parent.
-      getWrappedNode(ctx, newParent, true, true, false);
+      helper.wrapNodeForWriting(ctx, newParent, true, true, false, false, false);
 
       if (!oldParent.equals(newParent) && nodeAndChildren != null)
       {
@@ -313,7 +204,7 @@
          for (Fqn f : nodeAndChildren)
          {
             Fqn newChildFqn = f.replaceAncestor(oldParent, newParent);
-            getWrappedNode(ctx, newChildFqn, true, true, true);
+            helper.wrapNodeForWriting(ctx, newChildFqn, true, true, true, false, false);
          }
       }
 
@@ -324,13 +215,14 @@
    @Override
    public Object handleGravitateDataCommand(InvocationContext ctx, GravitateDataCommand command) throws Throwable
    {
-      return handleReadCommand(ctx, command, Collections.singletonList(command.getFqn()));
+      helper.wrapNodeForReading(ctx, command.getFqn());
+      return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object handleCreateNodeCommand(InvocationContext ctx, CreateNodeCommand command) throws Throwable
    {
-      getWrappedNode(ctx, command.getFqn(), true, true, false); // get the node and stick it in the context.
+      helper.wrapNodeForWriting(ctx, command.getFqn(), true, true, false, false, false); // get the node and stick it in the context.
       return invokeNextInterceptor(ctx, command);
    }
 
@@ -439,148 +331,4 @@
          throw new IllegalStateException("Attempting to do a commit or rollback but there is no transactional context in scope. " + ctx);
       }
    }
-
-   // ----------------- actual implementation details ----------------------------
-
-   protected Object handleReadCommand(InvocationContext ctx, VisitableCommand command, List<Fqn> fqns) throws Throwable
-   {
-      boolean forceWriteLock = ctx.getOptionOverrides().isForceWriteLock();
-
-      // does the node exist in the context?
-      for (Fqn f : fqns)
-      {
-         if (forceWriteLock)
-         {
-            if (trace) log.trace("Forcing lock on reading node " + f);
-            getWrappedNode(ctx, f, true, false, false);
-         }
-         else if (ctx.lookUpNode(f) == null)
-         {
-            if (trace) log.trace("Node " + f + " is not in context, fetching from container.");
-            // simple implementation.  Peek the node, wrap it, put wrapped node in the context.
-            InternalNode node = dataContainer.peekInternalNode(f, false);
-            NodeSPI wrapped = nodeFactory.createMvccNode(node);
-            if (wrapped != null) ctx.putLookedUpNode(f, wrapped);
-         }
-         else
-         {
-            if (trace) log.trace("Node " + f + " is already in context.");
-         }
-      }
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   /**
-    * First checks in contexts for the existence of the node.  If it does exist, it will return it, acquiring a lock if
-    * necessary.  Otherwise, it will peek in the dataContainer, wrap the node, lock if necessary, and add it to the context.
-    * If it doesn't even exist in the dataContainer and createIfAbsent is true, it will create a new node and add it to the
-    * data structure.  It will lock the node, and potentially the parent as well, if necessary.  If the parent is locked,
-    * it too will be added to the context if it wasn't there already.
-    *
-    * @param fqn                 to retrieve
-    * @param lockForWriting      if true, a lock will be acquired.
-    * @param createIfAbsent      if true, will be created if absent.
-    * @param includeInvalidNodes
-    * @return a NodeSPI or null.
-    */
-   protected NodeSPI getWrappedNode(InvocationContext context, Fqn fqn, boolean lockForWriting, boolean createIfAbsent, boolean includeInvalidNodes) throws InterruptedException
-   {
-      ReadCommittedNode n = (ReadCommittedNode) context.lookUpNode(fqn);
-      if (createIfAbsent && n != null && n.isNullNode()) n = null;
-      if (n != null)
-      {
-         // acquire lock if needed
-         if (lockForWriting && lock(context, fqn))
-         {
-            // create a copy of the underlying node
-            n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
-         }
-         if (trace) log.trace("Retrieving wrapped node " + fqn);
-         if (n.isDeleted() && createIfAbsent)
-         {
-            if (trace) log.trace("Node is deleted in current scope.  Need to un-delete.");
-            n.markAsDeleted(false);
-            n.setValid(true, false);
-         }
-         return n;
-      }
-
-      // else, fetch from dataContainer.
-      InternalNode in = dataContainer.peekInternalNode(fqn, includeInvalidNodes);
-      if (in != null)
-      {
-         // do we need a lock?
-         boolean needToCopy = false;
-         if (lockForWriting && lock(context, fqn))
-         {
-            needToCopy = true;
-         }
-         ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
-         context.putLookedUpNode(fqn, wrapped);
-         if (needToCopy)
-            wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
-         return wrapped;
-      }
-
-      // else, do we need to create one?
-      if (createIfAbsent)
-      {
-         Fqn parentFqn = fqn.getParent();
-         NodeSPI parent = getWrappedNode(context, parentFqn, false, createIfAbsent, false);
-         // do we need to lock the parent to create children?
-         boolean parentLockNeeded = parentLockNeeded(parent);
-         // get a lock on the parent.
-         if (parentLockNeeded && lock(context, parentFqn))
-         {
-            ReadCommittedNode parentRCN = (ReadCommittedNode) context.lookUpNode(parentFqn);
-            parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
-         }
-
-         // now to lock and create the node.
-         lock(context, fqn);
-
-         NodeSPI temp = parent.getOrCreateChild(fqn.getLastElement(), context.getGlobalTransaction());
-         // TODO: warning, hack!  There is a race condition here.  Add a way to create nodes without attaching to a parent.
-         parent.removeChildDirect(fqn.getLastElement());
-
-         in = (InternalNode) ((NodeInvocationDelegate) temp).getDelegationTarget();
-         ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
-         wrapped.setCreated(true);
-         context.putLookedUpNode(fqn, wrapped);
-         wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
-//         if (parentLockNeeded)
-//         {
-//            // since we copied the child make sure we update the parent's ref
-//            parent.addChildDirect(nodeFactory.createNodeInvocationDelegate(wrapped.getNode()));
-//         }
-
-         return wrapped;
-      }
-
-      return null;
-   }
-
-   /**
-    * Attempts to lock a node if the lock isn't already held in the current scope.
-    *
-    * @param ctx context
-    * @param fqn Fqn to lock
-    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
-    * @throws InterruptedException
-    * @throws TimeoutException     if we are unable to acquire the lock after a specified timeout.
-    */
-   protected boolean lock(InvocationContext ctx, Fqn fqn) throws InterruptedException
-   {
-      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
-      // lock which may be shared with another Fqn that we have a lock for already.
-      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
-      // Better to check our records and lock again if necessary.
-      if (!ctx.getLocks().contains(fqn))
-      {
-         if (!lockManager.lockAndRecord(fqn, WRITE, ctx))
-            throw new TimeoutException("Unable to acquire lock on Fqn [" + fqn + "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) + "] milliseconds!");
-         return true;
-      }
-      return false;
-   }
 }

Added: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java	2008-07-07 23:39:38 UTC (rev 6202)
@@ -0,0 +1,345 @@
+package org.jboss.cache.mvcc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InternalNode;
+import org.jboss.cache.NodeFactory;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.NonVolatile;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.invocation.NodeInvocationDelegate;
+import org.jboss.cache.lock.LockManager;
+import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.lock.TimeoutException;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility functions to manipulate wrapping {@link org.jboss.cache.InternalNode}s as {@link org.jboss.cache.mvcc.ReadCommittedNode}
+ * or {@link org.jboss.cache.mvcc.RepeatableReadNode}s.  Would also entail locking, if necessary.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+ at NonVolatile
+public class MVCCNodeHelper
+{
+   DataContainer dataContainer;
+   NodeFactory nodeFactory;
+   private static final Log log = LogFactory.getLog(MVCCNodeHelper.class);
+   private static final boolean trace = log.isTraceEnabled();
+   private long defaultLockAcquisitionTimeout;
+   private LockManager lockManager;
+   private Configuration configuration;
+   private boolean allowWriteSkew;
+   private boolean lockParentForChildInsertRemove;
+
+   @Inject
+   public void injectDependencies(DataContainer dataContainer, NodeFactory nodeFactory, LockManager lockManager, Configuration configuration)
+   {
+      this.nodeFactory = nodeFactory;
+      this.dataContainer = dataContainer;
+      this.configuration = configuration;
+      this.lockManager = lockManager;
+   }
+
+   @Start
+   public void start()
+   {
+      defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
+      allowWriteSkew = configuration.isAllowWriteSkew();
+      lockParentForChildInsertRemove = configuration.isLockParentForChildInsertRemove();
+   }
+
+
+   /**
+    * Attempts to provide the context with a set of wrapped nodes based on the Collection of fqns passed in.  If the nodes
+    * already exist in the context then the node is not wrapped again.
+    * <p/>
+    * {@link InternalNode}s are wrapped using {@link NodeFactory#createMvccNode(org.jboss.cache.InternalNode)} and as such,
+    * null internal nodes are treated according to isolation level used.  See {@link NodeFactory#createMvccNode(org.jboss.cache.InternalNode)}
+    * for details on this behaviour.
+    * <p/>
+    * Note that if the context has the {@link org.jboss.cache.config.Option#isForceWriteLock()} option set, then write locks are
+    * acquired and the node is copied.
+    * <p/>
+    *
+    * @param ctx  current invocation context
+    * @param fqns collection of Fqns.  Should not be null.
+    * @throws InterruptedException if write locks are forced and the lock manager is interrupted.
+    */
+   public void wrapNodesForReading(InvocationContext ctx, Collection<Fqn> fqns) throws InterruptedException
+   {
+      boolean forceWriteLock = ctx.getOptionOverrides().isForceWriteLock();
+
+      // does the node exist in the context?
+      for (Fqn f : fqns) wrapNodeForReading(ctx, f, forceWriteLock);
+   }
+
+   /**
+    * Similar to {@link #wrapNodesForReading(org.jboss.cache.invocation.InvocationContext, java.util.Collection)} except
+    * that this version takes a single Fqn parameter to wrap a single node.
+    *
+    * @param ctx current invocation context
+    * @param fqn fqn to fetch and wrap
+    * @throws InterruptedException if write locks are forced and the lock manager is interrupted.
+    */
+   public void wrapNodeForReading(InvocationContext ctx, Fqn fqn) throws InterruptedException
+   {
+      wrapNodeForReading(ctx, fqn, ctx.getOptionOverrides().isForceWriteLock());
+   }
+
+   private void wrapNodeForReading(InvocationContext ctx, Fqn f, boolean writeLockForced) throws InterruptedException
+   {
+      if (writeLockForced)
+      {
+         if (trace) log.trace("Forcing lock on reading node " + f);
+         wrapNodeForWriting(ctx, f, true, false, false, false, false);
+      }
+      else if (ctx.lookUpNode(f) == null)
+      {
+         if (trace) log.trace("Node " + f + " is not in context, fetching from container.");
+         // simple implementation.  Peek the node, wrap it, put wrapped node in the context.
+         InternalNode node = dataContainer.peekInternalNode(f, false);
+         ReadCommittedNode wrapped = nodeFactory.createMvccNode(node);
+         if (wrapped != null) ctx.putLookedUpNode(f, wrapped);
+      }
+      else
+      {
+         if (trace) log.trace("Node " + f + " is already in context.");
+      }
+   }
+
+   /**
+    * Attempts to lock a node if the lock isn't already held in the current scope, and records the lock in the context.
+    *
+    * @param ctx context
+    * @param fqn Fqn to lock
+    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
+    * @throws InterruptedException if interrupted
+    * @throws TimeoutException     if we are unable to acquire the lock after a specified timeout.
+    */
+   private boolean acquireLock(InvocationContext ctx, Fqn fqn) throws InterruptedException, TimeoutException
+   {
+      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+      // lock which may be shared with another Fqn that we have a lock for already.
+      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
+      // Better to check our records and lock again if necessary.
+      if (!ctx.getLocks().contains(fqn))
+      {
+         if (!lockManager.lockAndRecord(fqn, WRITE, ctx))
+            throw new TimeoutException("Unable to acquire lock on Fqn [" + fqn + "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) + "] milliseconds!");
+         return true;
+      }
+      return false;
+   }
+
+   /**
+    * First checks in contexts for the existence of the node.  If it does exist, it will return it, acquiring a lock if
+    * necessary.  Otherwise, it will peek in the dataContainer, wrap the node, lock if necessary, and add it to the context.
+    * If it doesn't even exist in the dataContainer and createIfAbsent is true, it will create a new node and add it to the
+    * data structure.  It will lock the node, and potentially the parent as well, if necessary.  If the parent is locked,
+    * it too will be added to the context if it wasn't there already.
+    *
+    * @param fqn                 to retrieve
+    * @param lockForWriting      if true, a lock will be acquired.
+    * @param createIfAbsent      if true, will be created if absent.
+    * @param includeInvalidNodes if true, invalid nodes are included.
+    * @param forRemoval          if true, the parent may also be locked if locking parents for removal is necessary.
+    * @param force               if true, will force the write lock even if the node is null.
+    * @return a wrapped node, or null.
+    */
+   public ReadCommittedNode wrapNodeForWriting(InvocationContext context, Fqn fqn, boolean lockForWriting, boolean createIfAbsent, boolean includeInvalidNodes, boolean forRemoval, boolean force) throws InterruptedException
+   {
+      ReadCommittedNode n = (ReadCommittedNode) context.lookUpNode(fqn);
+      if (createIfAbsent && n != null && n.isNullNode()) n = null;
+      if (n != null)
+      {
+         // acquire lock if needed
+         if (lockForWriting && acquireLock(context, fqn))
+         {
+            // create a copy of the underlying node
+            n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+         }
+         if (trace) log.trace("Retrieving wrapped node " + fqn);
+         if (n.isDeleted() && createIfAbsent)
+         {
+            if (trace) log.trace("Node is deleted in current scope.  Need to un-delete.");
+            n.markAsDeleted(false);
+            n.setValid(true, false);
+         }
+
+      }
+      else
+      {
+         // else, fetch from dataContainer.
+         InternalNode in = dataContainer.peekInternalNode(fqn, includeInvalidNodes);
+         if (in != null)
+         {
+            // do we need a lock?
+            boolean needToCopy = false;
+            if (lockForWriting && acquireLock(context, fqn))
+            {
+               needToCopy = true;
+            }
+            n = nodeFactory.createMvccNode(in);
+            context.putLookedUpNode(fqn, n);
+            if (needToCopy) n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+         }
+         else if (createIfAbsent) // else, do we need to create one?
+         {
+            Fqn parentFqn = fqn.getParent();
+            NodeSPI parent = wrapNodeForWriting(context, parentFqn, false, createIfAbsent, false, false, false);
+            // do we need to lock the parent to create children?
+            boolean parentLockNeeded = isParentLockNeeded(parent);
+            // get a lock on the parent.
+            if (parentLockNeeded && acquireLock(context, parentFqn))
+            {
+               ReadCommittedNode parentRCN = (ReadCommittedNode) context.lookUpNode(parentFqn);
+               parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+            }
+
+            // now to lock and create the node.
+            acquireLock(context, fqn);
+
+            NodeSPI temp = parent.getOrCreateChild(fqn.getLastElement(), context.getGlobalTransaction());
+            // TODO: warning, hack!  There is a race condition here.  Add a way to create nodes without attaching to a parent.
+            parent.removeChildDirect(fqn.getLastElement());
+
+            in = (InternalNode) ((NodeInvocationDelegate) temp).getDelegationTarget();
+            n = nodeFactory.createMvccNode(in);
+            n.setCreated(true);
+            context.putLookedUpNode(fqn, n);
+            n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory);
+         }
+      }
+
+      // see if we need to force the lock on nonexistent nodes.
+      if (n == null && force) acquireLock(context, fqn);
+
+      // now test if we need to lock the parent as well.
+      if ((n != null || force) && forRemoval && isParentLockNeeded(fqn.getParent(), context))
+         wrapNodeForWriting(context, fqn.getParent(), true, false, includeInvalidNodes, false, force);
+
+      return n;
+   }
+
+   /**
+    * Wraps a node and all its subnodes and adds them to the context, acquiring write locks for them all.
+    *
+    * @param ctx context
+    * @param fqn fqn to wrap
+    * @return a list of Fqns of locks acquired in this call.
+    * @throws InterruptedException if the lock manager is interrupted.
+    */
+   public List<Fqn> wrapNodesForWriting(InvocationContext ctx, Fqn fqn) throws InterruptedException
+   {
+      // when removing a node we want to get a lock on the Fqn anyway and return the wrapped node.
+      if (fqn.isRoot()) throw new CacheException("Attempting to remove Fqn.ROOT!");
+
+      Fqn parentFqn = fqn.getParent();
+      // inspect parent
+      boolean needToCopyParent = false;
+      boolean parentLockNeeded = isParentLockNeeded(parentFqn, ctx);
+      if (parentLockNeeded)
+      {
+         needToCopyParent = acquireLock(ctx, parentFqn);
+         // Ensure the node is in the context.
+         putNodeInContext(ctx, parentFqn, needToCopyParent);
+      }
+
+      boolean needToCopyNode = acquireLock(ctx, fqn);
+
+      // Ensure the node is in the context.
+      putNodeInContext(ctx, fqn, needToCopyNode);
+
+      ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
+
+      // update child ref on parent to point to child as this is now a copy.
+      if (node != null && !(node instanceof NullMarkerNode))
+      {
+         if (parentLockNeeded && (needToCopyNode || needToCopyParent))
+         {
+            ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(parentFqn);
+            parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode) node.getDelegationTarget()));
+         }
+
+         // now deal with children.
+         Map childMap = node.getChildrenMapDirect();
+         List<Fqn> fqnsToBeRemoved = new LinkedList<Fqn>();
+         fqnsToBeRemoved.add(fqn);
+         if (childMap == null || childMap.isEmpty()) return fqnsToBeRemoved;
+
+         for (Object n : childMap.values())
+         {
+            NodeSPI child = (NodeSPI) n;
+            lockForWritingRecursive(child.getFqn(), ctx, fqnsToBeRemoved);
+         }
+
+         return fqnsToBeRemoved;
+      }
+
+      return null;
+   }
+
+   private void lockForWritingRecursive(Fqn fqn, InvocationContext ctx, List<Fqn> fqnList) throws InterruptedException
+   {
+      acquireLock(ctx, fqn); // lock node
+      if (fqnList != null) fqnList.add(fqn);
+
+      // now wrap and add to the context
+      ReadCommittedNode rcn = wrapNodeForWriting(ctx, fqn, true, false, true, false, false);
+      if (rcn != null)
+      {
+         rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
+         ReadCommittedNode parent = (ReadCommittedNode) ctx.lookUpNode(fqn.getParent());
+         parent.addChildDirect(nodeFactory.createNodeInvocationDelegate((InternalNode) rcn.getDelegationTarget()));
+
+         Map<Object, NodeSPI> children = rcn.getChildrenMapDirect();
+         if (children != null)
+         {
+            for (NodeSPI child : children.values())
+            {
+               lockForWritingRecursive(child.getFqn(), ctx, fqnList);
+            }
+         }
+      }
+   }
+
+   private void putNodeInContext(InvocationContext ctx, Fqn fqn, boolean needToCopyNode)
+   {
+      ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
+      if (node == null)
+      {
+         InternalNode in = dataContainer.peekInternalNode(fqn, false);
+         node = nodeFactory.createMvccNode(in);
+         ctx.putLookedUpNode(fqn, node);
+      }
+
+      if (needToCopyNode && node != null && !node.isChanged()) // node could be null if using read-committed
+      {
+         node.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory);
+      }
+   }
+
+   private boolean isParentLockNeeded(NodeSPI parent)
+   {
+      return lockParentForChildInsertRemove || (parent != null && parent.isLockForChildInsertRemove());
+   }
+
+   private boolean isParentLockNeeded(Fqn parent, InvocationContext ctx)
+   {
+      NodeSPI parentNode = ctx.lookUpNode(parent);
+      if (parentNode == null) parentNode = dataContainer.peek(parent, true, true);
+      return isParentLockNeeded(parentNode);
+   }
+}




More information about the jbosscache-commits mailing list