[jbosscache-commits] JBoss Cache SVN: r6142 - in core/trunk/src: main/java/org/jboss/cache/commands/read and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Jul 1 09:43:00 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-07-01 09:42:59 -0400 (Tue, 01 Jul 2008)
New Revision: 6142

Added:
   core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
   core/trunk/src/main/java/org/jboss/cache/commands/read/GetChildrenNamesCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeyValueCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeysCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/read/GetNodeCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/write/EvictCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/write/MoveCommand.java
   core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/mvcc/ReadCommittedNode.java
   core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java
   core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockTestBase.java
Log:
MVCC-RR

Modified: core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/NodeFactory.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/NodeFactory.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -18,6 +18,7 @@
 import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.cache.lock.LockStrategyFactory;
 import org.jboss.cache.mvcc.InternalNode;
+import org.jboss.cache.mvcc.NullMarkerNode;
 import org.jboss.cache.mvcc.ReadCommittedNode;
 import org.jboss.cache.mvcc.RepeatableReadNode;
 import org.jboss.cache.optimistic.TransactionWorkspace;
@@ -41,6 +42,7 @@
    private CommandsFactory commandsFactory;
    private LockStrategyFactory lockStrategyFactory;
    private boolean useRepeatableRead;
+   private static final NullMarkerNode NULL_MARKER = new NullMarkerNode(null);
 
    @Override
    protected <T> T construct(Class<T> componentType)
@@ -50,13 +52,16 @@
 
    /**
     * Creates an MVCC wrapped node - either a {@link org.jboss.cache.mvcc.ReadCommittedNode} or it's subclass, a
-    * {@link org.jboss.cache.mvcc.RepeatableReadNode} based on cache configuration.
+    * {@link org.jboss.cache.mvcc.RepeatableReadNode} based on cache configuration.  If a null is passed in as the InternalNode,
+    * this method will return a special {@link org.jboss.cache.mvcc.NullMarkerNode} instance if using repeatable read,
+    * or a null if read committed.
     *
     * @param node internal node to wrap.
     * @return a ReadCommittedNode
     */
    public ReadCommittedNode createMvccNode(InternalNode node)
    {
+      if (node == null) return useRepeatableRead ? NULL_MARKER : null;
       ReadCommittedNode rcn = useRepeatableRead ? new RepeatableReadNode(node) : new ReadCommittedNode(node);
       rcn.initialize(configuration, invocationContextContainer, componentRegistry, interceptorChain);
       rcn.injectDependencies(cache);

Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetChildrenNamesCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetChildrenNamesCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetChildrenNamesCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -42,7 +42,7 @@
    public Object perform(InvocationContext ctx)
    {
       NodeSPI n = fqn == null ? null : ctx.lookUpNode(fqn);
-      if (n == null) return null;
+      if (n == null || n.isDeleted()) return null;
       Map childrenMap = n.getChildrenMapDirect();
       if (childrenMap == null || childrenMap.isEmpty()) return Collections.emptySet();
       Set childNames = new HashSet();

Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -36,7 +36,7 @@
    public Object perform(InvocationContext ctx)
    {
       NodeSPI n = ctx.lookUpNode(fqn);
-      if (n == null) return null;
+      if (n == null || n.isDeleted()) return null;
       return new MapCopy(n.getDataDirect());
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeyValueCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeyValueCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeyValueCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -60,9 +60,14 @@
       NodeSPI n = ctx.lookUpNode(fqn);
       if (n == null)
       {
-         log.trace("node not found");
+         if (trace) log.trace("Node not found");
          return null;
       }
+      if (n.isDeleted())
+      {
+         if (trace) log.trace("Node has been deleted!");
+         return null;
+      }
       if (sendNodeEvent) notifier.notifyNodeVisited(fqn, true, ctx);
       Object result = n.getDirect(key);
       if (sendNodeEvent) notifier.notifyNodeVisited(fqn, false, ctx);

Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeysCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeysCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetKeysCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -37,7 +37,7 @@
    public Object perform(InvocationContext ctx)
    {
       NodeSPI n = ctx.lookUpNode(fqn);
-      if (n == null) return null;
+      if (n == null || n.isDeleted()) return null;
       return n.getKeysDirect();
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetNodeCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetNodeCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetNodeCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -1,6 +1,7 @@
 package org.jboss.cache.commands.read;
 
 import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeSPI;
 import org.jboss.cache.commands.Visitor;
 import org.jboss.cache.invocation.InvocationContext;
 
@@ -34,7 +35,9 @@
     */
    public Object perform(InvocationContext ctx)
    {
-      return ctx.lookUpNode(fqn);
+      NodeSPI node = ctx.lookUpNode(fqn);
+      if (node != null && node.isDeleted()) return null;
+      return node;
    }
 
    public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable

Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/EvictCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/EvictCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/EvictCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -134,6 +134,7 @@
                parentNode.setChildrenLoaded(false);
             }
             node.setValid(false, false);
+            node.markAsDeleted(true, false);
             return true;
          }
       }

Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/MoveCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/MoveCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/MoveCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -87,7 +87,7 @@
       // the actual move algorithm.
       // ctx *could* be null if this is a rollback!!!  Sucks big time.
       NodeSPI newParent = ctx == null ? dataContainer.peek(newParentFqn) : ctx.lookUpNode(newParentFqn);
-      if (newParent == null)
+      if (newParent == null || newParent.isDeleted())
       {
          throw new NodeNotExistsException("New parent node " + newParentFqn + " does not exist when attempting to move node!!");
       }
@@ -95,7 +95,7 @@
       // ctx *could* be null if this is a rollback!!!  Sucks big time.
       NodeSPI node = ctx == null ? dataContainer.peek(toMoveFqn) : ctx.lookUpNode(toMoveFqn);
 
-      if (node == null)
+      if (node == null || node.isDeleted())
       {
          throw new NodeNotExistsException("Node " + toMoveFqn + " does not exist when attempting to move node!!");
       }

Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -46,7 +46,7 @@
 
       // Find the node
       targetNode = peekVersioned(ctx);
-      if (targetNode == null)
+      if (targetNode == null || targetNode.isDeleted())
       {
          if (trace) log.trace("node " + fqn + " not found");
          return false;

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/MVCCLockingInterceptor.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -31,6 +31,7 @@
 import org.jboss.cache.invocation.NodeInvocationDelegate;
 import org.jboss.cache.lock.LockManager;
 import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.mvcc.InternalNode;
 import org.jboss.cache.mvcc.ReadCommittedNode;
 
@@ -61,6 +62,7 @@
    LockManager lockManager;
    DataContainer dataContainer;
    NodeFactory nodeFactory;
+   private long defaultLockAcquisitionTimeout;
 
    @Inject
    public void setDependencies(LockManager lockManager, DataContainer dataContainer, NodeFactory nodeFactory)
@@ -75,6 +77,7 @@
    {
       allowWriteSkew = configuration.isAllowWriteSkew();
       lockParentForChildInsertRemove = configuration.isLockParentForChildInsertRemove();
+      defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
    }
 
    @Override
@@ -112,23 +115,38 @@
          {
             if (lockParentForChildInsertRemove || (parent != null && parent.isLockForChildInsertRemove()))
             {
-               // lock parent
-               lockManager.lockAndRecord(parentFqn, WRITE, ctx);
+               boolean needToCopyNode = lock(ctx, parentFqn);
 
-               // retrieve again from the dataContainer in case we have a race, and add to the context
-               getWrappedNode(ctx, parentFqn, true, false, false);
+               // Ensure the node is in the context.
+               putNodeInContext(ctx, parentFqn, needToCopyNode);
             }
          }
       }
 
-      lockManager.lockAndRecord(nodeFqn, WRITE, ctx); // lock node.
+      boolean needToCopyNode = lock(ctx, nodeFqn);
 
-      // now wrap and add to the context
-      getWrappedNode(ctx, nodeFqn, true, false, false);
+      // Ensure the node is in the context.
+      putNodeInContext(ctx, nodeFqn, needToCopyNode);
 
       return invokeNextInterceptor(ctx, command);
    }
 
+   private void putNodeInContext(InvocationContext ctx, Fqn fqn, boolean needToCopyNode)
+   {
+      ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
+      if (node == null)
+      {
+         InternalNode in = dataContainer.peekInternalNode(fqn, false);
+         node = nodeFactory.createMvccNode(in);
+         ctx.putLookedUpNode(fqn, node);
+      }
+
+      if (needToCopyNode && !node.isChanged())
+      {
+         node.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory, lockParentForChildInsertRemove);
+      }
+   }
+
    @Override
    public Object handleClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
    {
@@ -158,7 +176,8 @@
 
                // retrieve again from the dataContainer in case we have a race, and add to the context
                ReadCommittedNode rcn = (ReadCommittedNode) getWrappedNode(ctx, parentFqn, true, false, false);
-               if (rcn != null) rcn.copyNodeForUpdate(dataContainer, allowWriteSkew);
+               if (rcn != null)
+                  rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory, lockParentForChildInsertRemove);
             }
          }
       }
@@ -184,7 +203,7 @@
       ReadCommittedNode rcn = (ReadCommittedNode) getWrappedNode(ctx, fqn, true, false, true);
       if (rcn != null)
       {
-         rcn.copyNodeForUpdate(dataContainer, allowWriteSkew);
+         rcn.copyNodeForUpdate(dataContainer, allowWriteSkew, ctx, nodeFactory, lockParentForChildInsertRemove);
 
          if (isRecursive)
          {
@@ -376,11 +395,8 @@
          {
             // simple implementation.  Peek the node, wrap it, put wrapped node in the context.
             InternalNode node = dataContainer.peekInternalNode(f, false);
-            if (node != null)
-            {
-               NodeSPI wrapped = nodeFactory.createMvccNode(node);
-               ctx.putLookedUpNode(f, wrapped);
-            }
+            NodeSPI wrapped = nodeFactory.createMvccNode(node);
+            if (wrapped != null) ctx.putLookedUpNode(f, wrapped);
          }
       }
       return invokeNextInterceptor(ctx, command);
@@ -437,15 +453,14 @@
    protected NodeSPI getWrappedNode(InvocationContext context, Fqn fqn, boolean lockForWriting, boolean createIfAbsent, boolean includeInvalidNodes) throws InterruptedException
    {
       ReadCommittedNode n = (ReadCommittedNode) context.lookUpNode(fqn);
+      if (createIfAbsent && n != null && n.isNullNode()) n = null;
       if (n != null)
       {
          // acquire lock if needed
-         if (lockForWriting && !isLocked(context, fqn))
+         if (lockForWriting && lock(context, fqn))
          {
-            lockManager.lockAndRecord(fqn, WRITE, context);
             // create a copy of the underlying node
-
-            n.copyNodeForUpdate(dataContainer, allowWriteSkew);
+            n.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory, lockParentForChildInsertRemove);
          }
          if (trace) log.trace("Retrieving wrapped node " + fqn);
          return n;
@@ -457,14 +472,14 @@
       {
          // do we need a lock?
          boolean needToCopy = false;
-         if (lockForWriting && !isLocked(context, fqn))
+         if (lockForWriting && lock(context, fqn))
          {
-            lockManager.lockAndRecord(fqn, WRITE, context);
             needToCopy = true;
          }
          ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
          context.putLookedUpNode(fqn, wrapped);
-         if (needToCopy) wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew);
+         if (needToCopy)
+            wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory, lockParentForChildInsertRemove);
          return wrapped;
       }
 
@@ -477,33 +492,48 @@
          if (lockParentForChildInsertRemove || parent.isLockForChildInsertRemove())
          {
             // get a lock on the parent.
-            if (!isLocked(context, parentFqn))
+            if (lock(context, parentFqn))
             {
-               lockManager.lockAndRecord(parentFqn, WRITE, context);
                ReadCommittedNode parentRCN = (ReadCommittedNode) context.lookUpNode(parentFqn);
-               parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew);
+               parentRCN.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory, lockParentForChildInsertRemove);
             }
          }
 
          // now to lock and create the node.
-         if (!isLocked(context, fqn)) lockManager.lockAndRecord(fqn, WRITE, context);
+         lock(context, fqn);
+
          NodeSPI temp = parent.getOrCreateChild(fqn.getLastElement(), context.getGlobalTransaction());
          in = (InternalNode) ((NodeInvocationDelegate) temp).getDelegationTarget();
          ReadCommittedNode wrapped = nodeFactory.createMvccNode(in);
          context.putLookedUpNode(fqn, wrapped);
-         wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew);
+         wrapped.copyNodeForUpdate(dataContainer, allowWriteSkew, context, nodeFactory, lockParentForChildInsertRemove);
          return wrapped;
       }
 
       return null;
    }
 
-   protected boolean isLocked(InvocationContext ctx, Fqn fqn)
+   /**
+    * Attempts to lock a node if the lock isn't already held in the current scope.
+    *
+    * @param ctx context
+    * @param fqn Fqn to lock
+    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
+    * @throws InterruptedException
+    * @throws TimeoutException     if we are unable to acquire the lock after a specified timeout.
+    */
+   protected boolean lock(InvocationContext ctx, Fqn fqn) throws InterruptedException
    {
       // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
       // lock which may be shared with another Fqn that we have a lock for already.
       // nothing wrong, just means that we fail to record the lock.  And that is a problem.
       // Better to check our records and lock again if necessary.
-      return ctx.getLocks().contains(fqn);
+      if (!ctx.getLocks().contains(fqn))
+      {
+         if (!lockManager.lockAndRecord(fqn, WRITE, ctx))
+            throw new TimeoutException("Unable to acquire lock on Fqn [" + fqn + "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) + "] milliseconds!");
+         return true;
+      }
+      return false;
    }
 }

Added: core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -0,0 +1,43 @@
+package org.jboss.cache.mvcc;
+
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.NodeFactory;
+import org.jboss.cache.invocation.InvocationContext;
+
+/**
+ * A marker node to represent a null node for repeatable read
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class NullMarkerNode extends RepeatableReadNode
+{
+   public NullMarkerNode(InternalNode node)
+   {
+      super(node);
+   }
+
+   @Override
+   public boolean isNullNode()
+   {
+      return true;
+   }
+
+   @Override
+   public boolean isDeleted()
+   {
+      return true;
+   }
+
+   @Override
+   public boolean isValid()
+   {
+      return false;
+   }
+
+   @Override
+   public void copyNodeForUpdate(DataContainer d, boolean b, InvocationContext ctx, NodeFactory nodeFactory, boolean lockParent)
+   {
+      // no op
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/ReadCommittedNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/ReadCommittedNode.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/ReadCommittedNode.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -2,6 +2,7 @@
 
 import org.jboss.cache.DataContainer;
 import org.jboss.cache.NodeFactory;
+import org.jboss.cache.invocation.InvocationContext;
 import org.jboss.cache.invocation.NodeInvocationDelegate;
 import org.jboss.cache.optimistic.DataVersion;
 import org.jboss.cache.optimistic.DefaultDataVersion;
@@ -22,8 +23,13 @@
       super(node);
    }
 
-   public void copyNodeForUpdate(DataContainer container, boolean ignoreWriteSkew)
+   public boolean isNullNode()
    {
+      return false;
+   }
+
+   public void copyNodeForUpdate(DataContainer container, boolean allowWriteSkew, InvocationContext ctx, NodeFactory nodeFactory, boolean lockParent)
+   {
       changed = true;
       backup = node;
       node = backup.copy();
@@ -59,4 +65,14 @@
       backup = null;
       changed = false;
    }
+
+   public boolean isChanged()
+   {
+      return changed;
+   }
+
+   public void setChanged(boolean changed)
+   {
+      this.changed = changed;
+   }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -3,6 +3,7 @@
 import org.jboss.cache.DataContainer;
 import org.jboss.cache.NodeFactory;
 import org.jboss.cache.NodeSPI;
+import org.jboss.cache.invocation.InvocationContext;
 import org.jboss.cache.optimistic.DataVersion;
 import org.jboss.cache.optimistic.DataVersioningException;
 import org.jboss.cache.optimistic.DefaultDataVersion;
@@ -21,7 +22,7 @@
    }
 
    @Override
-   public void copyNodeForUpdate(DataContainer container, boolean ignoreWriteSkew)
+   public void copyNodeForUpdate(DataContainer container, boolean allowWriteSkew, InvocationContext ctx, NodeFactory nodeFactory, boolean lockParent)
    {
       // mark node as changed.
       changed = true;
@@ -29,9 +30,11 @@
       // check for write skew.
       NodeSPI underlyingNode = container.peek(getFqn(), false, true);  // even check for invalid nodes.  we should check tombstones too.
       DataVersion underlyingNodeVersion = underlyingNode == null ? null : underlyingNode.getVersion();
-      if (!ignoreWriteSkew && underlyingNode != null && !node.getVersion().equals(underlyingNodeVersion))
+      if (!allowWriteSkew && underlyingNode != null && !node.getVersion().equals(underlyingNodeVersion))
       {
-         throw new DataVersioningException("Detected write skew.  Attempting to overwrite version " + node.getVersion() + " but current version has progressed to " + underlyingNodeVersion);
+         String errormsg = new StringBuilder().append("Detected write skew.  Attempting to overwrite version ").append(node.getVersion()).append(" but current version has progressed to ").append(underlyingNodeVersion).toString();
+         if (log.isWarnEnabled()) log.warn(errormsg + ".  Unable to copy node for update.");
+         throw new DataVersioningException(errormsg);
       }
 
       // make a backup copy
@@ -43,17 +46,12 @@
       DataVersion newVersion = ((DefaultDataVersion) node.getVersion()).increment();
       node.setVersion(newVersion);
 
-      // update parent nodes references - May not be necessary, we could just make sure we don't overwrite child maps when
-      // updateNode() runs.
-
-//      if (!getFqn().isRoot())
-//      {
-//         RepeatableReadNode parent = (RepeatableReadNode) ctx.lookUpNode(getFqn().getParent());
-//         if (parent.changed)
-//         {
-//            parent.addChildDirect();
-//         }
-//      }
+      // if the parent is in the context make sure the parent has a ref to the copy now.
+      if (!getFqn().isRoot() && lockParent)
+      {
+         NodeSPI parent = ctx.lookUpNode(getFqn().getParent());
+         if (parent != null) parent.addChildDirect(nodeFactory.createNodeInvocationDelegate(node));
+      }
    }
 
    @Override

Modified: core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockTestBase.java	2008-07-01 13:42:35 UTC (rev 6141)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockTestBase.java	2008-07-01 13:42:59 UTC (rev 6142)
@@ -17,8 +17,10 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -51,6 +53,9 @@
       cache.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
       cache.getConfiguration().setLockParentForChildInsertRemove(lockParentForInsertRemove);
       cache.getConfiguration().setIsolationLevel(repeatableRead ? IsolationLevel.REPEATABLE_READ : IsolationLevel.READ_COMMITTED);
+      cache.getConfiguration().setAllowWriteSkew(allowWriteSkew);
+      // reduce lock acquisition timeout so this doesn't take forever to run
+      cache.getConfiguration().setLockAcquisitionTimeout(200); // 200 ms
       cache.start();
       lockManager = TestingUtil.extractComponentRegistry(cache).getComponent(LockManager.class);
       icc = TestingUtil.extractComponentRegistry(cache).getComponent(InvocationContextContainer.class);
@@ -79,7 +84,7 @@
    protected void assertNoLocks()
    {
       LockContainer lc = (LockContainer) TestingUtil.extractField(lockManager, "lockContainer");
-      assert lc.getNumLocksHeld() == 0 : "Stale locks exist!" + lockManager.printLockInfo();
+      assert lc.getNumLocksHeld() == 0 : "Stale locks exist! NumLocksHeld is " + lc.getNumLocksHeld() + " and lock info is " + lockManager.printLockInfo();
       assert icc.get().getLocks().isEmpty() : "Stale (?) locks recorded! " + icc.get().getLocks();
    }
 
@@ -133,12 +138,13 @@
       assertLocked(A);
       assertLocked(AB);
       assertNotLocked(ABC);
+      assert "v".equals(cache.get(AB, "k"));
       tm.commit();
-
+      assert "v".equals(cache.get(AB, "k"));
       assertNoLocks();
 
       tm.begin();
-      assert cache.get(AB, "k").equals("v");
+      assert "v".equals(cache.get(AB, "k"));
       assertNotLocked(Fqn.ROOT);
       assertNotLocked(A);
       assertNotLocked(AB);
@@ -435,7 +441,7 @@
       }
       catch (TimeoutException expected)
       {
-
+//         expected.printStackTrace();  // for debugging
       }
       tm.commit();
       tm.resume(t1);
@@ -496,15 +502,19 @@
          final Set<Exception> w2exceptions = new HashSet<Exception>();
          final CountDownLatch w1Signal = new CountDownLatch(1);
          final CountDownLatch w2Signal = new CountDownLatch(1);
+         final CountDownLatch threadSignal = new CountDownLatch(2);
 
-         Thread w1 = new Thread()
+         Thread w1 = new Thread("Writer-1")
          {
             public void run()
             {
+               boolean didCoundDown = false;
                try
                {
                   tm.begin();
                   assert "v".equals(cache.get(AB, "k"));
+                  threadSignal.countDown();
+                  didCoundDown = true;
                   w1Signal.await();
                   cache.put(AB, "k", "v2");
                   tm.commit();
@@ -513,17 +523,24 @@
                {
                   w1exceptions.add(e);
                }
+               finally
+               {
+                  if (!didCoundDown) threadSignal.countDown();
+               }
             }
          };
 
-         Thread w2 = new Thread()
+         Thread w2 = new Thread("Writer-2")
          {
             public void run()
             {
+               boolean didCoundDown = false;
                try
                {
                   tm.begin();
                   assert "v".equals(cache.get(AB, "k"));
+                  threadSignal.countDown();
+                  didCoundDown = true;
                   w2Signal.await();
                   cache.put(AB, "k", "v3");
                   tm.commit();
@@ -531,13 +548,30 @@
                catch (Exception e)
                {
                   w2exceptions.add(e);
+                  // the exception will be thrown when doing a cache.put().  We should make sure we roll back the tx to release locks.
+                  if (!allowWriteSkew)
+                  {
+                     try
+                     {
+                        tm.rollback();
+                     }
+                     catch (SystemException e1)
+                     {
+                        // do nothing.
+                     }
+                  }
                }
+               finally
+               {
+                  if (!didCoundDown) threadSignal.countDown();
+               }
             }
          };
 
          w1.start();
          w2.start();
 
+         threadSignal.await();
          // now.  both txs have read.
          // let tx1 start writing
          w1Signal.countDown();
@@ -548,20 +582,30 @@
 
          if (allowWriteSkew)
          {
-            assert "v3".equals(cache.get(AB, "k")) : "W2 should have overwritten W1's work!";
             // should have no exceptions!!
+            throwExceptions(w1exceptions, w2exceptions);
             assert w2exceptions.size() == 0;
             assert w1exceptions.size() == 0;
+            assert "v3".equals(cache.get(AB, "k")) : "W2 should have overwritten W1's work!";
          }
          else
          {
-            assert "v2".equals(cache.get(AB, "k")) : "W2 should NOT have overwritten W1's work!";
             // there should be a single exception from w2.
             assert w2exceptions.size() == 1;
+            throwExceptions(w1exceptions);
             assert w1exceptions.size() == 0;
+            assert "v2".equals(cache.get(AB, "k")) : "W2 should NOT have overwritten W1's work!";
          }
 
          assertNoLocks();
       }
    }
+
+   protected void throwExceptions(Collection<Exception>... exceptions) throws Exception
+   {
+      for (Collection<Exception> ce : exceptions)
+      {
+         for (Exception e : ce) throw e;
+      }
+   }
 }




More information about the jbosscache-commits mailing list