[jbosscache-commits] JBoss Cache SVN: r5887 - in core/trunk/src: main/java/org/jboss/cache/factories and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri May 23 13:15:52 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-05-23 13:15:52 -0400 (Fri, 23 May 2008)
New Revision: 5887

Added:
   core/trunk/src/main/java/org/jboss/cache/factories/LockManagerFactory.java
   core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
   core/trunk/src/main/java/org/jboss/cache/lock/LockType.java
   core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
   core/trunk/src/main/java/org/jboss/cache/lock/PessimisticNodeBasedLockManager.java
Removed:
   core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
   core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
   core/trunk/src/main/java/org/jboss/cache/NodeSPI.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/RegionManager.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/lock/IdentityLock.java
   core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
   core/trunk/src/main/java/org/jboss/cache/lock/NodeLock.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java
   core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
   core/trunk/src/test/java/org/jboss/cache/lock/AcquireAllTest.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/OptimisticLockInterceptorTest.java
Log:
Refactored locks to use a LockManager rather than directly accessing locks from the NodeSPI.  This is to allow for different locking systems, which are not associated with nodes - e.g., striped locks.

Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -9,6 +9,7 @@
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.factories.annotations.Stop;
 import org.jboss.cache.invocation.NodeInvocationDelegate;
+import org.jboss.cache.lock.LockManager;
 import org.jboss.cache.marshall.NodeData;
 import org.jboss.cache.optimistic.DataVersion;
 import org.jboss.cache.transaction.GlobalTransaction;
@@ -45,11 +46,12 @@
     */
    private final Set<Fqn> internalFqns = new HashSet<Fqn>();
    private NodeFactory nodeFactory;
+   private LockManager lockManager;
 
    @Inject
-   public void injectDependencies(Configuration configuration, NodeFactory nodeFactory)
+   public void injectDependencies(Configuration configuration, NodeFactory nodeFactory, LockManager lockManager)
    {
-      setDependencies(configuration, nodeFactory);
+      setDependencies(configuration, nodeFactory, lockManager);
 
       // We need to create a root node even at this stage since certain components rely on this being available before
       // start() is called.
@@ -58,10 +60,11 @@
       createRootNode();
    }
 
-   public void setDependencies(Configuration configuration, NodeFactory nodeFactory)
+   public void setDependencies(Configuration configuration, NodeFactory nodeFactory, LockManager lockManager)
    {
       this.configuration = configuration;
       this.nodeFactory = nodeFactory;
+      this.lockManager = lockManager;
    }
 
    @Start(priority = 12)
@@ -320,7 +323,7 @@
       int num = 0;
       if (n != null)
       {
-         if (n.getLock().isLocked())
+         if (lockManager.isLocked(n))
          {
             num++;
          }
@@ -369,15 +372,7 @@
     */
    public String printLockInfo()
    {
-      StringBuffer sb = new StringBuffer("\n");
-      int indent = 0;
-
-      for (Object n : root.getChildrenDirect())
-      {
-         ((NodeSPI) n).getLock().printLockInfo(sb, indent);
-         sb.append("\n");
-      }
-      return sb.toString();
+      return lockManager.printLockInfo(root);
    }
 
    public int getNumberOfAttributes(Fqn fqn)

Modified: core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/InvocationContext.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/InvocationContext.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -39,12 +39,12 @@
    // defaults to true.
    private boolean originLocal = true;
    private boolean txHasMods;
-   private boolean cacheLoaderHasMods;
    private boolean localRollbackOnly;
    @Deprecated
    private MethodCall methodCall;
    @Deprecated
    private VisitableCommand command;
+   List<NodeLock> invocationLocks;
 
    // used to store cache peeks within the scope of a single context. Performing a cache peek can be a huge bottle neck.
    // See JBCACHE-811
@@ -188,8 +188,6 @@
       return originLocal;
    }
 
-   List<NodeLock> invocationLocks;
-
    public List<NodeLock> getInvocationLocksAcquired()
    {
       return invocationLocks;
@@ -210,41 +208,6 @@
    }
 
    /**
-    * for non-tx calls, release any locks acquired.  These used to be in a separate Map<Thread, List<NodeLock>> called a lockTable,
-    * but that has been dropped in facour of storing the invocation-specific locks in the invocation context.  Cleaner to have it all
-    * in one place, plus much more performant.
-    */
-   public void clearInvocationLocksAcquired()
-   {
-      if (isLockingSuppressed()) return;
-      if (!isValidTransaction())
-      { // no TX
-         List<NodeLock> locks = getInvocationLocksAcquired();
-         if (trace)
-            log.trace("Attempting to release locks on current thread.  Locks for the invocation is " + locks);
-
-         if (locks != null && locks.size() > 0)
-         {
-            Thread currentThread = Thread.currentThread();
-            try
-            {
-               // make sure we release locks in *reverse* order!
-               for (int i = locks.size() - 1; i > -1; i--)
-               {
-                  NodeLock nl = locks.get(i);
-                  if (trace) log.trace("releasing lock for " + nl.getFqn() + ": " + nl);
-                  nl.release(currentThread);
-               }
-            }
-            finally
-            {
-               invocationLocks = null;
-            }
-         }
-      }
-   }
-
-   /**
     * @return true if options exist to suppress locking - false otherwise.  Note that this is only used by the {@link org.jboss.cache.interceptors.PessimisticLockInterceptor}.
     */
    public boolean isLockingSuppressed()
@@ -273,7 +236,6 @@
             ", optionOverrides=" + optionOverrides +
             ", originLocal=" + originLocal +
             ", txHasMods=" + txHasMods +
-            ", cacheLoaderHasMods=" + cacheLoaderHasMods +
             '}';
    }
 
@@ -346,7 +308,6 @@
       if (localRollbackOnly != that.localRollbackOnly) return false;
       if (originLocal != that.originLocal) return false;
       if (txHasMods != that.txHasMods) return false;
-      if (cacheLoaderHasMods != that.cacheLoaderHasMods) return false;
       if (globalTransaction != null ? !globalTransaction.equals(that.globalTransaction) : that.globalTransaction != null)
       {
          return false;
@@ -369,7 +330,6 @@
       result = 29 * result + (optionOverrides != null ? optionOverrides.hashCode() : 0);
       result = 29 * result + (originLocal ? 1 : 0);
       result = 29 * result + (txHasMods ? 1 : 0);
-      result = 29 * result + (cacheLoaderHasMods ? 1 : 0);
       result = 29 * result + (localRollbackOnly ? 1 : 0);
       return result;
    }

Modified: core/trunk/src/main/java/org/jboss/cache/NodeSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/NodeSPI.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/NodeSPI.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -108,7 +108,9 @@
     * Returns a lock for this node.
     *
     * @return node lock
+    * @deprecated this will be removed in 3.0.0.  Please use methods on the {@link org.jboss.cache.lock.LockManager} to lock and unlock nodes.
     */
+   @Deprecated
    NodeLock getLock();
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -17,8 +17,8 @@
 import org.jboss.cache.factories.annotations.Stop;
 import org.jboss.cache.interceptors.InterceptorChain;
 import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.lock.LockManager;
 import org.jboss.cache.lock.LockUtil;
-import org.jboss.cache.lock.NodeLock;
 import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
 import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
@@ -94,12 +94,13 @@
    private boolean isUsingBuddyReplication;
    private boolean isInLocalMode;
    private ComponentRegistry componentRegistry;
+   private LockManager lockManager;
 
    @Inject
    private void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, NotifierImpl notifier,
                                   CacheSPI spi, Marshaller marshaller, TransactionTable txTable,
                                   TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
-                                  ComponentRegistry componentRegistry)
+                                  ComponentRegistry componentRegistry, LockManager lockManager)
    {
       this.messageListener = messageListener;
       this.configuration = configuration;
@@ -111,6 +112,7 @@
       this.invocationContextContainer = container;
       this.interceptorChain = interceptorChain;
       this.componentRegistry = componentRegistry;
+      this.lockManager = lockManager;
    }
 
    // ------------ START: Lifecycle methods ------------
@@ -328,26 +330,20 @@
    private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
    {
       Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
-      NodeLock lock = node.getLock();
-      Object owner = lock.getWriterOwner();
+      Object owner = lockManager.getWriteOwner(node);
 
-      if (isLockOwnerDead(owner, deadMembers))
-      {
-         deadOwners.add((GlobalTransaction) owner);
-      }
+      if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
 
-      for (Object readOwner : lock.getReaderOwners())
+
+      for (Object readOwner : lockManager.getReadOwners(node))
       {
-         if (isLockOwnerDead(readOwner, deadMembers))
-         {
-            deadOwners.add((GlobalTransaction) readOwner);
-         }
+         if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
       }
 
       for (GlobalTransaction deadOwner : deadOwners)
       {
          boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
-         boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, txTable, txManager);
+         boolean broken = LockUtil.breakTransactionLock(node, lockManager, deadOwner, localTx, txTable, txManager);
 
          if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
       }

Modified: core/trunk/src/main/java/org/jboss/cache/RegionManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManager.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -18,7 +18,8 @@
 import org.jboss.cache.factories.annotations.NonVolatile;
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.factories.annotations.Stop;
-import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.lock.LockManager;
+import static org.jboss.cache.lock.LockType.WRITE;
 import org.jgroups.Address;
 
 import java.util.ArrayList;
@@ -60,13 +61,15 @@
    protected final Set<Fqn> activationChangeNodes = Collections.synchronizedSet(new HashSet<Fqn>());
    protected Configuration configuration;
    protected RPCManager rpcManager;
+   private LockManager lockManager;
 
    @Inject
-   void injectDependencies(CacheSPI cache, Configuration configuration, RPCManager rpcManager)
+   void injectDependencies(CacheSPI cache, Configuration configuration, RPCManager rpcManager, LockManager lockManager)
    {
       this.cache = cache;
       this.rpcManager = rpcManager;
       this.configuration = configuration;
+      this.lockManager = lockManager;
    }
 
    @Start
@@ -542,12 +545,10 @@
          throw new CacheException("Region " + fqn + " is already being activated/deactivated");
       }
 
-      NodeSPI parent;
-      NodeSPI subtreeRoot;
+      NodeSPI parent = null;
+      NodeSPI subtreeRoot = null;
       boolean parentLocked = false;
       boolean subtreeLocked = false;
-      NodeLock parentLock = null;
-      NodeLock subtreeLock = null;
 
       try
       {
@@ -586,18 +587,11 @@
                // Acquire locks
 
                Object owner = getOwnerForLock();
-               subtreeLock = subtreeRoot.getLock();
-               subtreeLock.acquireAll(owner, stateFetchTimeout, NodeLock.LockType.WRITE);
-               subtreeLocked = true;
+               subtreeLocked = lockManager.lockAll(subtreeRoot, WRITE, owner, stateFetchTimeout);
 
                // Lock the parent, as we're about to write to it
                parent = subtreeRoot.getParent();
-               if (parent != null)
-               {
-                  parentLock = parent.getLock();
-                  parentLock.acquire(owner, stateFetchTimeout, NodeLock.LockType.WRITE);
-                  parentLocked = true;
-               }
+               if (parent != null) parentLocked = lockManager.lock(parent.getFqn(), WRITE, owner, stateFetchTimeout);
 
                // Remove the subtree
                cache.evict(subtree, true);
@@ -607,21 +601,17 @@
                if (parent != null)
                {
                   log.debug("forcing release of locks in parent");
-                  parentLock.releaseAll();
+                  lockManager.unlockAll(parent);
                }
 
                parentLocked = false;
 
                log.debug("forcing release of all locks in subtree");
-               subtreeLock.releaseAll();
+               lockManager.unlockAll(subtreeRoot);
                subtreeLocked = false;
             }
          }
       }
-      catch (InterruptedException ie)
-      {
-         throw new CacheException("Interrupted while acquiring lock", ie);
-      }
       finally
       {
          // If we didn't succeed, undo the marshalling change
@@ -635,7 +625,7 @@
             log.debug("forcing release of locks in parent");
             try
             {
-               parentLock.releaseAll();
+               if (parent != null) lockManager.unlockAll(parent);
             }
             catch (Throwable t)
             {
@@ -647,7 +637,7 @@
             log.debug("forcing release of all locks in subtree");
             try
             {
-               subtreeLock.releaseAll();
+               if (subtreeRoot != null) lockManager.unlockAll(subtreeRoot);
             }
             catch (Throwable t)
             {

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -172,6 +172,7 @@
       s.add(RuntimeConfigAwareFactory.class);
       s.add(TransactionManagerFactory.class);
       s.add(ReplicationQueueFactory.class);
+      s.add(LockManagerFactory.class);
       return s;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -7,7 +7,6 @@
 import org.jboss.cache.invocation.CacheInvocationDelegate;
 import org.jboss.cache.invocation.InvocationContextContainer;
 import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.lock.LockManager;
 import org.jboss.cache.lock.LockStrategyFactory;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.VersionAwareMarshaller;
@@ -25,7 +24,7 @@
 @DefaultFactoryFor(classes = {StateTransferManager.class, RegionManager.class, NotifierImpl.class,
       ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class,
       InvocationContextContainer.class, CacheInvocationDelegate.class,
-      TransactionTable.class, DataContainerImpl.class, CommandsFactory.class, LockManager.class,
+      TransactionTable.class, DataContainerImpl.class, CommandsFactory.class,
       LockStrategyFactory.class})
 public class EmptyConstructorFactory extends ComponentFactory
 {

Added: core/trunk/src/main/java/org/jboss/cache/factories/LockManagerFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/LockManagerFactory.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/factories/LockManagerFactory.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -0,0 +1,30 @@
+package org.jboss.cache.factories;
+
+import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+import org.jboss.cache.lock.LockManager;
+import org.jboss.cache.lock.NodeBasedLockManager;
+import org.jboss.cache.lock.PessimisticNodeBasedLockManager;
+
+/**
+ * Creates lock managers
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.2.0
+ */
+ at DefaultFactoryFor(classes = LockManager.class)
+public class LockManagerFactory extends EmptyConstructorFactory
+{
+   @Override
+   @SuppressWarnings({"unchecked", "deprecation"})
+   protected <T> T construct(Class<T> componentType)
+   {
+      if (configuration.isNodeLockingOptimistic())
+      {
+         return (T) super.construct(NodeBasedLockManager.class);
+      }
+      else
+      {
+         return (T) super.construct(PessimisticNodeBasedLockManager.class);
+      }
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -12,11 +12,11 @@
 import org.jboss.cache.commands.read.GetKeysCommand;
 import org.jboss.cache.commands.read.GetNodeCommand;
 import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.commands.write.ClearDataCommand;
 import org.jboss.cache.commands.write.MoveCommand;
 import org.jboss.cache.commands.write.PutDataMapCommand;
 import org.jboss.cache.commands.write.PutForExternalReadCommand;
 import org.jboss.cache.commands.write.PutKeyValueCommand;
-import org.jboss.cache.commands.write.ClearDataCommand;
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.Configuration;
@@ -27,7 +27,7 @@
 import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.lock.LockManager;
-import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.lock.LockType;
 import org.jboss.cache.notifications.NotifierImpl;
 import org.jboss.cache.transaction.TransactionEntry;
 import org.jboss.cache.transaction.TransactionTable;
@@ -270,7 +270,7 @@
          // - Manik Surtani (21 March 2006)
          if (acquireLock)
          {
-            lock(fqn, NodeLock.LockType.WRITE, false, ctx);// non-recursive for now
+            lock(fqn, LockType.WRITE, false, ctx);// non-recursive for now
          }
 
 //         if (!initNode && !wasRemovedInTx(fqn, ctx.getGlobalTransaction()))
@@ -358,7 +358,7 @@
             loadChildren(child.getFqn(), child, true, isMove, ctxt);
          }
       }
-      lock(fqn, recursive ? NodeLock.LockType.WRITE : NodeLock.LockType.READ, true, ctxt);// recursive=true: lock entire subtree
+      lock(fqn, recursive ? LockType.WRITE : LockType.READ, true, ctxt);// recursive=true: lock entire subtree
       node.setChildrenLoaded(true);
    }
 
@@ -427,15 +427,12 @@
       return retval;
    }
 
-   protected void lock(Fqn fqn, NodeLock.LockType lockType, boolean recursive, InvocationContext ctx) throws Throwable
+   protected void lock(Fqn fqn, LockType lockType, boolean recursive, InvocationContext ctx) throws Throwable
    {
       if (configuration.isNodeLockingOptimistic()) return;
-      lockManager.acquireLocksWithTimeout(ctx, fqn, lockType, false, false, false, false, null, false);
-      if (recursive)
-      {
-         NodeSPI node = dataContainer.peek(fqn, false, false);
-         lockManager.acquireLocksOnChildren(node, lockType, ctx);
-      }
+
+      lockManager.lock(fqn, lockType, ctx.getGlobalTransaction() != null ? ctx.getGlobalTransaction() : Thread.currentThread());
+      if (recursive) lockManager.lockAllAndRecord(fqn, lockType, ctx);
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -13,6 +13,8 @@
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.interceptors.base.CommandInterceptor;
+import org.jboss.cache.lock.LockManager;
+import static org.jboss.cache.lock.LockType.READ;
 import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.optimistic.TransactionWorkspace;
 import org.jboss.cache.optimistic.WorkspaceNode;
@@ -33,12 +35,14 @@
 {
    protected TransactionManager txManager;
    protected TransactionTable txTable;
+   protected LockManager lockManager;
 
    @Inject
-   private void injectDependencies(TransactionManager txManager, TransactionTable txTable)
+   private void injectDependencies(TransactionManager txManager, TransactionTable txTable, LockManager lockManager)
    {
       this.txManager = txManager;
       this.txTable = txTable;
+      this.lockManager = lockManager;
    }
 
    protected TransactionWorkspace getTransactionWorkspace(InvocationContext ctx) throws CacheException
@@ -92,7 +96,9 @@
     * Undeletes a node that already exists in the workspace, by setting appropriate flags and re-adding to parent's child map.
     *
     * @param nodeToUndelete WorkspaceNode to undelete
+    * @param parent         parent of node to undelete
     */
+   @SuppressWarnings("unchecked")
    protected void undeleteWorkspaceNode(WorkspaceNode nodeToUndelete, WorkspaceNode parent)
    {
       nodeToUndelete.markAsDeleted(false);
@@ -102,21 +108,17 @@
       nodeToUndelete.markAsResurrected(true);
    }
 
+   @SuppressWarnings("unchecked")
    protected WorkspaceNode lockAndCreateWorkspaceNode(NodeFactory nodeFactory, NodeSPI node, TransactionWorkspace workspace, GlobalTransaction gtx, long timeout)
    {
-      boolean locked = false;
-      try
-      {
-         locked = node.getLock().acquireReadLock(gtx, timeout);
-      }
-      catch (InterruptedException e)
-      {
-         // do nothing
-      }
+      boolean locked = lockManager.lock(node.getFqn(), READ, gtx, timeout);
+
       if (!locked)
          throw new TimeoutException("Unable to lock node " + node.getFqn() + " after timeout " + timeout + " for copying into workspace");
+
       WorkspaceNode wn = nodeFactory.createWorkspaceNode(node, workspace);
-      node.getLock().release(gtx);
+
+      lockManager.unlock(node.getFqn(), gtx);
       return wn;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -13,11 +13,9 @@
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
 import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.lock.LockManager;
-import static org.jboss.cache.lock.NodeLock.LockType.READ;
-import static org.jboss.cache.lock.NodeLock.LockType.WRITE;
+import static org.jboss.cache.lock.LockType.READ;
+import static org.jboss.cache.lock.LockType.WRITE;
 import org.jboss.cache.optimistic.TransactionWorkspace;
 import org.jboss.cache.optimistic.WorkspaceNode;
 import org.jboss.cache.transaction.GlobalTransaction;
@@ -33,14 +31,7 @@
 public class OptimisticLockingInterceptor extends OptimisticInterceptor
 {
    private long lockAcquisitionTimeout;
-   private LockManager lockManager;
 
-   @Inject
-   private void injectLockManager(LockManager lockManager)
-   {
-      this.lockManager = lockManager;
-   }
-
    @Start
    private void init()
    {
@@ -54,18 +45,11 @@
    {
       //try and acquire the locks - before passing on
       GlobalTransaction gtx = getGlobalTransaction(ctx);
-      long timeout = lockAcquisitionTimeout;
-      if (ctx.getOptionOverrides() != null
-            && ctx.getOptionOverrides().getLockAcquisitionTimeout() >= 0)
-      {
-         timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
-      }
 
       boolean succeeded = false;
       try
       {
          TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(ctx);
-         TransactionEntry te = ctx.getTransactionEntry();
          if (log.isDebugEnabled()) log.debug("Locking nodes in transaction workspace for GlobalTransaction " + gtx);
 
          for (WorkspaceNode workspaceNode : workspace.getNodes().values())
@@ -74,11 +58,10 @@
 
             boolean isWriteLockNeeded = workspaceNode.isDirty() || (workspaceNode.isChildrenModified() && (configuration.isLockParentForChildInsertRemove() || node.isLockForChildInsertRemove()));
 
-            boolean acquired = node.getLock().acquire(gtx, timeout, isWriteLockNeeded ? WRITE : READ);
+            boolean acquired = lockManager.lockAndRecord(node, isWriteLockNeeded ? WRITE : READ, ctx);
             if (acquired)
             {
                if (trace) log.trace("Acquired lock on node " + node.getFqn());
-               te.addLock(node.getLock());
             }
             else
             {
@@ -101,7 +84,7 @@
       }
       finally
       {
-         if (!succeeded) unlock(ctx, gtx);
+         if (!succeeded) unlock(ctx);
       }
    }
 
@@ -128,7 +111,7 @@
       }
       finally
       {
-         unlock(ctx, getGlobalTransaction(ctx));
+         unlock(ctx);
       }
       return retval;
    }
@@ -138,14 +121,14 @@
     *
     * @param ctx Invocation Context
     */
-   private void unlock(InvocationContext ctx, GlobalTransaction gtx)
+   private void unlock(InvocationContext ctx)
    {
       try
       {
          TransactionEntry entry = ctx.getTransactionEntry();
          if (entry != null)
          {
-            lockManager.releaseLocks(entry.getLocks(), ctx.getGlobalTransaction());
+            lockManager.unlock(ctx);
          }
       }
       catch (Exception e)

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -31,7 +31,9 @@
 import org.jboss.cache.interceptors.base.PostProcessingCommandInterceptor;
 import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.cache.lock.LockManager;
-import org.jboss.cache.lock.NodeLock;
+import static org.jboss.cache.lock.LockType.READ;
+import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.lock.PessimisticNodeBasedLockManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionEntry;
 
@@ -58,13 +60,13 @@
 public class PessimisticLockInterceptor extends PostProcessingCommandInterceptor
 {
    private DataContainerImpl dataContainer;
-   private LockManager lockManager;
+   private PessimisticNodeBasedLockManager lockManager;
 
    @Inject
    public void injectDependencies(DataContainerImpl dataContainer, LockManager lockManager)
    {
       this.dataContainer = dataContainer;
-      this.lockManager = lockManager;
+      this.lockManager = (PessimisticNodeBasedLockManager) lockManager;
    }
 
    @Override
@@ -105,7 +107,7 @@
       }
       else
       {
-         lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, true,
+         lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, true,
                zeroAcquisitionTimeout, false, true, null, false);
       }
       return invokeNextInterceptor(ctx, command);
@@ -120,7 +122,7 @@
       // commit propagated up from the tx interceptor
       commit(ctx.getTransactionEntry(), ctx.getGlobalTransaction());
       Object retVal = invokeNextInterceptor(ctx, command);
-      lockManager.releaseLocks(ctx.getTransactionEntry().getLocks(), ctx.getGlobalTransaction());
+      lockManager.unlock(ctx);
       return retVal;
    }
 
@@ -130,7 +132,7 @@
       commit(ctx.getTransactionEntry(), command.getGlobalTransaction());
       if (trace) log.trace("bypassed locking as method commit() doesn't require locking");
       Object retVal = invokeNextInterceptor(ctx, command);
-      lockManager.releaseLocks(ctx.getTransactionEntry().getLocks(), ctx.getGlobalTransaction());
+      lockManager.unlock(ctx);
       return retVal;
    }
 
@@ -160,7 +162,7 @@
          log.trace("bypassed locking as method rollback() doesn't require locking");
       }
       Object retVal = invokeNextInterceptor(ctx, command);
-      lockManager.releaseLocks(ctx.getTransactionEntry().getLocks(), ctx.getGlobalTransaction());
+      lockManager.unlock(ctx);
       return retVal;
    }
 
@@ -168,31 +170,29 @@
    protected Object handleMoveCommand(InvocationContext ctx, MoveCommand command) throws Throwable
    {
       if (ctx.isLockingSuppressed()) return invokeNextInterceptor(ctx, command);
+
       // this call will ensure the node gets a WL and it's current parent gets RL.
       if (trace) log.trace("Attempting to get WL on node to be moved [" + command.getFqn() + "]");
       if (command.getFqn() != null && !(configuration.getIsolationLevel() == IsolationLevel.NONE))
       {
-         lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, false, false, true, false, null, false);
+         lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, false, false, true, false, null, false);
          if (ctx.getGlobalTransaction() != null)
          {
             ctx.getTransactionEntry().addRemovedNode(command.getFqn());
          }
-         lockManager.acquireLocksOnChildren(dataContainer.peek(command.getFqn(), true, false), NodeLock.LockType.WRITE, ctx);
+         lockManager.lockAllAndRecord(dataContainer.peek(command.getFqn(), true, false), WRITE, ctx);
       }
       if (command.getTo() != null && !(configuration.getIsolationLevel() == IsolationLevel.NONE))
       {
          //now for an RL for the new parent.
          if (trace) log.trace("Attempting to get RL on new parent [" + command.getTo() + "]");
-         lockManager.acquireLocksWithTimeout(ctx, command.getTo(), NodeLock.LockType.READ, false, false, false, false, null, false);
-         lockManager.acquireLocksOnChildren(dataContainer.peek(command.getTo(), true, false), NodeLock.LockType.READ, ctx);
+         lockManager.lockPessimistically(ctx, command.getTo(), READ, false, false, false, false, null, false);
+         lockManager.lockAllAndRecord(dataContainer.peek(command.getTo(), true, false), READ, ctx);
       }
       Object retValue = invokeNextInterceptor(ctx, command);
       // do a REAL remove here.
       NodeSPI n = dataContainer.peek(command.getFqn(), true, false);
-      if (n != null)
-      {
-         n.getLock().releaseAll(Thread.currentThread());
-      }
+      if (n != null) lockManager.unlockAll(n, Thread.currentThread());
       return retValue;
    }
 
@@ -204,7 +204,7 @@
       List<NodeSPI> createdNodes = new LinkedList<NodeSPI>();
       // we need to mark new nodes created as deleted since they are only created to form a path to the node being removed, to
       // create a lock.
-      boolean created = lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, true, false, true, true, createdNodes, true);
+      boolean created = lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, true, false, true, true, createdNodes, true);
       TransactionEntry entry = null;
       if (ctx.getGlobalTransaction() != null)
       {
@@ -216,8 +216,9 @@
             nodeSPI.markAsDeleted(true);
          }
       }
-      lockManager.acquireLocksOnChildren(dataContainer.peek(command.getFqn(), false, false), NodeLock.LockType.WRITE, ctx, entry, true);
 
+      lockManager.lockAllForRemoval(dataContainer.peek(command.getFqn(), false, false), ctx, entry);
+
       if (!createdNodes.isEmpty())
       {
          if (trace) log.trace("There were new nodes created, skipping notification on delete");
@@ -236,10 +237,7 @@
 
          //TODO: 2.2.0: end of the logic that needs to be moved
          NodeSPI n = dataContainer.peek(command.getFqn(), true, false);
-         if (n != null)
-         {
-            n.getLock().releaseAll(Thread.currentThread());
-         }
+         if (n != null) lockManager.unlockAll(n, Thread.currentThread());
       }
       // if this is a delete op and we had to create the node, return a FALSE as nothing *really* was deleted!
       return created ? false : retVal;
@@ -248,56 +246,56 @@
    @Override
    protected Object handleRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleRemoveDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleEvictFqnCommand(InvocationContext ctx, EvictCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.WRITE, false, true, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), WRITE, false, true, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.READ, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), READ, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.READ, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), READ, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.READ, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), READ, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    protected Object handleGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
    {
-      lockManager.acquireLocksWithTimeout(ctx, command.getFqn(), NodeLock.LockType.READ, false, false, false, false, null, false);
+      lockManager.lockPessimistically(ctx, command.getFqn(), READ, false, false, false, false, null, false);
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public void doAfterCall(InvocationContext ctx, VisitableCommand command)
    {
-      ctx.clearInvocationLocksAcquired();
+      lockManager.unlock(ctx);
    }
 
    /**
@@ -317,4 +315,5 @@
          dataContainer.removeFromDataStructure(fqn, false);
       }
    }
+
 }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -600,10 +600,7 @@
    protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
    {
       TransactionEntry entry = ctx.getTransactionEntry();
-      if (entry != null)
-      {
-         lockManager.releaseLocks(entry.getLocks(), ctx.getGlobalTransaction());
-      }
+      if (entry != null) lockManager.unlock(ctx);
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/lock/IdentityLock.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/IdentityLock.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/lock/IdentityLock.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -62,6 +62,7 @@
  * @author Ben Wang July 2003
  * @version $Revision$
  */
+ at SuppressWarnings("deprecation")
 public class IdentityLock implements NodeLock
 {
 
@@ -469,15 +470,15 @@
       }
    }
 
-   public boolean acquire(Object caller, long timeout, NodeLock.LockType lock_type) throws LockingException, TimeoutException, InterruptedException
+   public boolean acquire(Object caller, long timeout, LockType lock_type) throws LockingException, TimeoutException, InterruptedException
    {
       try
       {
-         if (lock_type == NodeLock.LockType.NONE)
+         if (lock_type == LockType.NONE)
          {
             return true;
          }
-         else if (lock_type == NodeLock.LockType.READ)
+         else if (lock_type == LockType.READ)
          {
             return acquireReadLock(caller, timeout);
          }

Deleted: core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -1,343 +0,0 @@
-package org.jboss.cache.lock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.DataContainerImpl;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.NodeSPI;
-import org.jboss.cache.commands.write.PutDataMapCommand;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.CommandsFactory;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.TransactionEntry;
-import org.jboss.cache.transaction.TransactionTable;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 2.2
- */
-public class LockManager
-{
-   private static final Log log = LogFactory.getLog(LockManager.class);
-   private static final boolean trace = log.isTraceEnabled();
-
-   private Configuration configuration;
-   private long lockAcquisitionTimeout;
-   private DataContainerImpl dataContainer;
-   private NodeSPI rootNode;
-   private TransactionTable txTable;
-   private CommandsFactory commandsFactory;
-
-   @Inject
-   public void inject(Configuration configuration, DataContainerImpl dataContainer, TransactionTable txTable,
-                      CommandsFactory commandsFactory)
-   {
-      this.configuration = configuration;
-      this.dataContainer = dataContainer;
-      this.txTable = txTable;
-      this.commandsFactory = commandsFactory;
-   }
-
-   @Start
-   public void setRootNode()
-   {
-      this.lockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
-      rootNode = dataContainer.getRoot();
-   }
-
-
-   public boolean acquireLocksWithTimeout(InvocationContext ctx, Fqn fqn, NodeLock.LockType lockType,
-                                          boolean createIfNotExists, boolean zeroLockTimeout,
-                                          boolean acquireLockOnParent, boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification)
-         throws InterruptedException
-   {
-      if (fqn == null || configuration.getIsolationLevel() == IsolationLevel.NONE || ctx.isLockingSuppressed())
-         return false;
-
-      boolean created;
-      long timeout = zeroLockTimeout ? 0 : ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout);
-      // make sure we can bail out of this loop
-      long cutoffTime = System.currentTimeMillis() + timeout;
-      boolean firstTry = true;
-      do
-      {
-         // this is an additional check to make sure we don't try for too long.
-         if (!firstTry && System.currentTimeMillis() > cutoffTime)
-         {
-            throw new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after " + timeout + " millis");
-         }
-         created = lock(ctx, fqn, lockType, createIfNotExists, timeout, acquireLockOnParent, reverseRemoveCheck, createdNodes, skipNotification);
-         firstTry = false;
-      }
-      while (createIfNotExists && (dataContainer.peek(fqn, false, false) == null));// keep trying until we have the lock (fixes concurrent remove())
-      return created;
-   }
-
-
-   /**
-    * Acquires locks on the node and on its parrents. Read locks are acquired for exsiting ancestors, with two exceptions:
-    * 1) createIfNotExists is true. If an ancestor is created on the fly, then an WL is acquired by default
-    * 2) acquireWriteLockOnParent is true. If so AND {@link org.jboss.cache.Node#isLockForChildInsertRemove()} then a read
-    * lock will be aquired for the parent of the node.
-    *
-    * @param createIfNotExists  if true, then missing nodes will be cretaed on the fly. If false, method returns if we
-    *                           reach a node that does not exists
-    * @param reverseRemoveCheck see {@link #manageReverseRemove(org.jboss.cache.transaction.GlobalTransaction, org.jboss.cache.NodeSPI, boolean, java.util.List)}
-    * @param createdNodes       a list to which any nodes created can register their Fqns so that calling code is aware of which nodes have been newly created.
-    * @param skipNotification
-    */
-   private boolean lock(InvocationContext ctx, Fqn fqn, NodeLock.LockType lockType, boolean createIfNotExists, long timeout,
-                        boolean acquireWriteLockOnParent, boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification)
-         throws TimeoutException, LockingException, InterruptedException
-   {
-      Thread currentThread = Thread.currentThread();
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-      boolean created = false;
-      // if the tx associated with the current thread is rolling back, barf! JBCACHE-923
-      if (gtx != null) TransactionTable.assertTransactionValid(ctx);
-
-      Object owner = (gtx != null) ? gtx : currentThread;
-      NodeSPI currentNode;
-      if (trace) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
-      long expiryTime = System.currentTimeMillis() + timeout;
-      currentNode = rootNode;
-      NodeSPI parent = null;
-      Object childName = null;
-      int currentIndex = -1;
-      int targetFqnSize = fqn.size();
-
-      do
-      {
-         if (currentNode == null)
-         {
-            if (createIfNotExists)
-            {
-               // if the new node is to be marked as deleted, do not notify!
-               currentNode = parent.addChildDirect(childName, !skipNotification);
-               created = true;
-               if (trace) log.trace("Child node was null, so created child node " + childName);
-               if (createdNodes != null) createdNodes.add(currentNode);
-            }
-            else
-            {
-               if (trace)
-                  log.trace("failed to find or create child " + childName + " of node " + parent);
-               return false;
-            }
-         }
-         else
-         {
-            if (!currentNode.isValid() && createIfNotExists) currentNode.setValid(true, false);
-         }
-
-         NodeLock.LockType lockTypeRequired = NodeLock.LockType.READ;
-         if (created || writeLockNeeded(ctx, lockType, currentIndex, acquireWriteLockOnParent, createIfNotExists, fqn, currentNode))
-         {
-            lockTypeRequired = NodeLock.LockType.WRITE;
-         }
-
-         Fqn currentNodeFqn = currentNode.getFqn();
-         // actually acquire the lock we need.  This method blocks.
-         acquireNodeLock(ctx, currentNode, owner, gtx, lockTypeRequired, timeout);
-
-         manageReverseRemove(ctx, currentNode, reverseRemoveCheck, createdNodes);
-         // make sure the lock we acquired isn't on a deleted node/is an orphan!!
-         // look into invalidated nodes as well
-         NodeSPI repeek = dataContainer.peek(currentNodeFqn, true, true);
-         if (currentNode != repeek)
-         {
-            if (trace)
-               log.trace("Was waiting for and obtained a lock on a node that doesn't exist anymore!  Attempting lock acquisition again.");
-            // we have an orphan!! Lose the unnecessary lock and re-acquire the lock (and potentially recreate the node).
-            // check if the parent exists!!
-            // look into invalidated nodes as well
-            currentNode.getLock().releaseAll(owner);
-            if (parent == null || dataContainer.peek(parent.getFqn(), true, true) == null)
-            {
-               // crap!
-               if (trace)
-                  log.trace("Parent has been deleted again.  Go through the lock method all over again.");
-               currentNode = rootNode;
-               currentIndex = -1;
-               parent = null;
-            }
-            else
-            {
-               currentNode = parent;
-               currentIndex--;
-               parent = null;
-               if (System.currentTimeMillis() > expiryTime)
-               {
-                  throw new TimeoutException("Unable to acquire lock on child node " + Fqn.fromRelativeElements(currentNode.getFqn(), childName) + " after " + timeout + " millis.");
-               }
-               if (trace) log.trace("Moving one level up, current node is :" + currentNode);
-            }
-         }
-         else
-         {
-            // we have succeeded in acquiring this lock. Increment the current index since we have gained one level of depth in the tree.
-            currentIndex++;
-
-            // now test if this is the final level and if we can quit the loop:
-            //if (currentNodeFqn.equals(fqn))//we've just processed the last child
-            if (currentIndex == targetFqnSize)
-            {
-               break;
-            }
-            if (!fqn.isChildOrEquals(currentNode.getFqn())) // Does this ever happen?  Perhaps with a move(), I suppose?  - MS
-            {
-               String message = new StringBuffer("currentNode instance changed the FQN(").append(currentNode.getFqn())
-                     .append(") and do not match the FQN on which we want to acquire lock(").append(fqn).append(")").toString();
-               log.trace(message);
-               throw new LockingException(message);
-            }
-            parent = currentNode;
-
-            childName = fqn.get(currentIndex);
-            currentNode = currentNode.getChildDirect(childName);
-         }
-      }
-      while (true);
-      return created;
-   }
-
-   /**
-    * Used by lock()
-    * Determins whter an arbitrary node from the supplied fqn needs an write lock.
-    */
-   private boolean writeLockNeeded(InvocationContext ctx, NodeLock.LockType lockType, int currentNodeIndex, boolean acquireWriteLockOnParent, boolean createIfNotExists, Fqn targetFqn, NodeSPI currentNode)
-   {
-      int treeNodeSize = targetFqn.size();
-      // write lock forced!!
-      boolean isTargetNode = currentNodeIndex == (treeNodeSize - 1);
-      if (isTargetNode && ctx.getOptionOverrides().isForceWriteLock()) return true;
-      //this can be injected, from the caller as a param named wlParent
-      if (currentNode.isLockForChildInsertRemove())
-      {
-         if (acquireWriteLockOnParent && currentNodeIndex == treeNodeSize - 2)
-         {
-            return true;// we're doing a remove and we've reached the PARENT node of the target to be removed.
-         }
-         if (!isTargetNode && dataContainer.peek(targetFqn.getAncestor(currentNodeIndex + 2), false, false) == null)
-         {
-            return createIfNotExists;// we're at a node in the tree, not yet at the target node, and we need to create the next node.  So we need a WL here.
-         }
-      }
-      return lockType == NodeLock.LockType.WRITE && isTargetNode;//write lock explicitly requested and this is the target to be written to.
-   }
-
-   private void acquireNodeLock(InvocationContext ctx, NodeSPI node, Object owner, GlobalTransaction gtx, NodeLock.LockType lockType, long lockTimeout) throws LockingException, TimeoutException, InterruptedException
-   {
-      NodeLock lock = node.getLock();
-      boolean acquired = lock.acquire(owner, lockTimeout, lockType);
-      if (acquired)
-      {
-         // Record the lock for release on method return or tx commit/rollback
-         if (gtx != null)
-         {
-            ctx.getTransactionEntry().addLock(lock);
-         }
-         else
-         {
-            ctx.addInvocationLockAcquired(lock);
-         }
-      }
-   }
-
-   /**
-    * Test if this node needs to be 'undeleted'
-    * reverse the "remove" if the node has been previously removed in the same tx, if this operation is a put()
-    */
-   public void manageReverseRemove(InvocationContext ctx, NodeSPI childNode, boolean reverseRemoveCheck, List createdNodes)
-   {
-      if (ctx.getGlobalTransaction() != null) //if no tx then reverse remove does not make sense
-      {
-         Fqn fqn = childNode.getFqn();
-         TransactionEntry entry = ctx.getTransactionEntry();
-         boolean needToReverseRemove = reverseRemoveCheck && childNode.isDeleted() && isNodeRemovedInCurrentTransaction(entry, fqn);
-         if (!needToReverseRemove) return;
-         childNode.markAsDeleted(false);
-         //if we'll rollback the tx data should be added to the node again
-         Map oldData = new HashMap(childNode.getDataDirect());
-         PutDataMapCommand command = commandsFactory.buildPutDataMapCommand(ctx.getGlobalTransaction(), fqn, oldData);
-         // txTable.get(gtx).addUndoOperation(command); --- now need to make sure this is added to the normal mods list instead
-         entry.addModification(command);
-         //we're prepared for rollback, now reset the node
-         childNode.clearDataDirect();
-         if (createdNodes != null)
-         {
-            createdNodes.add(childNode);
-         }
-      }
-   }
-
-   private boolean isNodeRemovedInCurrentTransaction(TransactionEntry entry, Fqn fqn)
-   {
-      return entry != null && entry.getRemovedNodes().contains(fqn);
-   }
-
-   public void acquireLocksOnChildren(NodeSPI parentNode, NodeLock.LockType lockType, InvocationContext ctx) throws InterruptedException
-   {
-      acquireLocksOnChildren(parentNode, lockType, ctx, null, false);
-   }
-
-   /**
-    * Acquires nodes on the children of this node. nodes on the node itself are not aquired.
-    * If the supplied parent node is null the method returns(no op).
-    */
-   public void acquireLocksOnChildren(NodeSPI parentNode, NodeLock.LockType lockType, InvocationContext ctx, TransactionEntry entry, boolean addChildrenToDeletedList)
-         throws InterruptedException
-   {
-      if (parentNode == null)
-      {
-         return;
-      }
-      long timeout = ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout);
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-      Object owner = (gtx != null) ? gtx : Thread.currentThread();
-
-      Set<NodeLock> acquiredLocks = parentNode.getLock().acquireAll(owner, timeout, lockType);
-      if (acquiredLocks.size() > 0)
-      {
-         if (gtx != null)
-         {
-            ctx.getTransactionEntry().addLocks(acquiredLocks);
-            if (addChildrenToDeletedList)
-            {
-               for (NodeLock l : acquiredLocks)
-               {
-                  entry.addRemovedNode(l.getFqn());
-               }
-            }
-         }
-         else
-         {
-            ctx.addInvocationLocksAcquired(acquiredLocks);
-         }
-      }
-   }
-
-   /**
-    * Releases all locks held by the owner, in reverse order of creation.
-    */
-   public void releaseLocks(List<NodeLock> locks, Object owner)
-   {
-      // Copying out to an array is faster than creating an ArrayList and iterating,
-      // since list creation will just copy out to an array internally
-      IdentityLock[] lockArray = locks.toArray(new IdentityLock[locks.size()]);
-      for (int i = lockArray.length - 1; i >= 0; i--)
-      {
-         if (trace) log.trace("releasing lock for " + lockArray[i].getFqn() + " (" + lockArray[i] + ")");
-         lockArray[i].release(owner);
-      }
-      locks.clear();
-   }
-}

Added: core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -0,0 +1,270 @@
+package org.jboss.cache.lock;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeSPI;
+
+import java.util.Collection;
+
+/**
+ * An interface to deal with all aspects of acquiring and releasing locks for nodes in the cache.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.2.0
+ */
+public interface LockManager
+{
+   /**
+    * Determines the owner to be used when obtaining locks, given an invocation context.  This is typically a {@link org.jboss.cache.transaction.GlobalTransaction} if one
+    * is present in the context, or {@link Thread#currentThread()} if one is not present.
+    *
+    * @param ctx invocation context
+    * @return owner to be used for acquiring locks.
+    */
+   Object getLockOwner(InvocationContext ctx);
+
+   /**
+    * Acquires a lock of type lockType, for a given owner, on a specific Node in the cache, denoted by fqn.  This
+    * method will try for {@link org.jboss.cache.config.Configuration#getLockAcquisitionTimeout()} milliseconds and give up if it is unable to acquire the required lock.
+    *
+    * @param fqn      Fqn to lock
+    * @param lockType type of lock to acquire
+    * @param owner    owner to acquire the lock for
+    * @return true if the lock was acquired, false otherwise.
+    */
+   boolean lock(Fqn fqn, LockType lockType, Object owner);
+
+   /**
+    * Acquires a lock of type lockType, for a given owner, on a specific Node in the cache, denoted by fqn.  This
+    * method will try for timeout milliseconds and give up if it is unable to acquire the required lock.
+    *
+    * @param fqn      Fqn to lock
+    * @param lockType type of lock to acquire
+    * @param owner    owner to acquire the lock for
+    * @param timeout  maximum length of time to wait for (in millis)
+    * @return true if the lock was acquired, false otherwise.
+    */
+   boolean lock(Fqn fqn, LockType lockType, Object owner, long timeout);
+
+   /**
+    * Acquires a lock of type lockType, on a specific Node in the cache, denoted by fqn.  This
+    * method will try for a period of time and give up if it is unable to acquire the required lock.  The period of time
+    * is specified in {@link org.jboss.cache.config.Option#getLockAcquisitionTimeout()} and, if this is unset, the default timeout
+    * set in {@link org.jboss.cache.config.Configuration#getLockAcquisitionTimeout()} is used.
+    * <p/>
+    * In addition, any locks acquired are added to the context using {@link org.jboss.cache.InvocationContext#addInvocationLockAcquired(NodeLock)}
+    * if you are not running in a transaction, or using {@link org.jboss.cache.transaction.TransactionEntry#addLock(NodeLock)} if you are.
+    * <p/>
+    * The owner for the lock is determined by passing the invocation context to {@link #getLockOwner(org.jboss.cache.InvocationContext)}.
+    * <p/>
+    *
+    * @param fqn      Fqn to lock
+    * @param lockType type of lock to acquire
+    * @param ctx      invocation context associated with this invocation
+    * @return true if the lock was acquired, false otherwise.
+    */
+   boolean lockAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx);
+
+   /**
+    * Acquires a lock of type lockType, on a specific Node in the cache, denoted by fqn.  This
+    * method will try for a period of time and give up if it is unable to acquire the required lock.  The period of time
+    * is specified in {@link org.jboss.cache.config.Option#getLockAcquisitionTimeout()} and, if this is unset, the default timeout
+    * set in {@link org.jboss.cache.config.Configuration#getLockAcquisitionTimeout()} is used.
+    * <p/>
+    * In addition, any locks acquired are added to the context using {@link org.jboss.cache.InvocationContext#addInvocationLockAcquired(NodeLock)}
+    * if you are not running in a transaction, or using {@link org.jboss.cache.transaction.TransactionEntry#addLock(NodeLock)} if you are.
+    * <p/>
+    * The owner for the lock is determined by passing the invocation context to {@link #getLockOwner(org.jboss.cache.InvocationContext)}.
+    * <p/>
+    *
+    * @param node     Fqn to lock
+    * @param lockType type of lock to acquire
+    * @param ctx      invocation context associated with this invocation
+    * @return true if the lock was acquired, false otherwise.
+    */
+   boolean lockAndRecord(NodeSPI node, LockType lockType, InvocationContext ctx);
+
+
+   /**
+    * Releases the lock passed in, held by the specified owner
+    *
+    * @param lock  NodeLock to unlock
+    * @param owner lock owner
+    */
+   void unlock(NodeLock lock, Object owner);
+
+   /**
+    * Releases the lock passed in, held by the specified owner
+    *
+    * @param fqn   Fqn of the node to unlock
+    * @param owner lock owner
+    */
+   void unlock(Fqn fqn, Object owner);
+
+   /**
+    * Releases locks present in an invocation context and transaction entry, if one is available.
+    * <p/>
+    * Locks are released in reverse order of which they are acquired and registered.
+    * <p/>
+    * Lock owner is determined by passing the invocation context to {@link #getLockOwner(org.jboss.cache.InvocationContext)}
+    * <p/>
+    *
+    * @param ctx invocation context to inspect
+    */
+   void unlock(InvocationContext ctx);
+
+
+   /**
+    * Locks the node and all child nodes, acquiring lock of type specified for the owner specified.  If only some locks are
+    * acquired, all locks are released and the method returns false.
+    * <p/>
+    * This method will try for {@link org.jboss.cache.config.Configuration#getLockAcquisitionTimeout()} milliseconds and give up if it is unable to acquire the required lock.
+    * <p/>
+    *
+    * @param node     Node to lock
+    * @param lockType type of lock to acquire
+    * @param owner    owner to acquire the lock for
+    * @return true if the lock was acquired, false otherwise.
+    */
+   boolean lockAll(NodeSPI node, LockType lockType, Object owner);
+
+   /**
+    * Locks the node and all child nodes, acquiring lock of type specified for the owner specified.  If only some locks are
+    * acquired, all locks are released and the method returns false.  Internal Fqns are included as well.
+    *
+    * @param node     Node to lock
+    * @param lockType type of lock to acquire
+    * @param owner    owner to acquire the lock for
+    * @param timeout  maximum length of time to wait for (in millis)
+    * @return true if all locks were acquired, false otherwise.
+    */
+   boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout);
+
+   /**
+    * Locks the node and all child nodes, acquiring lock of type specified for the owner specified.  If only some locks are
+    * acquired, all locks are released and the method returns false.
+    *
+    * @param node                Node to lock
+    * @param lockType            type of lock to acquire
+    * @param owner               owner to acquire the lock for
+    * @param timeout             maximum length of time to wait for (in millis)
+    * @param excludeInternalFqns if true, any Fqns that are internal are excluded.
+    * @return true if all locks were acquired, false otherwise.
+    */
+   boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout, boolean excludeInternalFqns);
+
+   /**
+    * Locks the node and all child nodes, acquiring lock of type specified for the owner specified.  If only some locks are
+    * acquired, all locks are released and the method returns false.
+    * <p/>
+    * In addition, any locks acquired are added to the context using {@link org.jboss.cache.InvocationContext#addInvocationLockAcquired(NodeLock)}
+    * if you are not running in a transaction, or using {@link org.jboss.cache.transaction.TransactionEntry#addLock(NodeLock)} if you are.
+    * <p/>
+    * The owner for the lock is determined by passing the invocation context to {@link #getLockOwner(org.jboss.cache.InvocationContext)}.
+    * <p/>
+    *
+    * @param node     Node to lock
+    * @param lockType type of lock to acquire
+    * @param ctx      invocation context associated with this invocation
+    * @return true if all locks were acquired, false otherwise.
+    */
+   boolean lockAllAndRecord(NodeSPI node, LockType lockType, InvocationContext ctx);
+
+   /**
+    * Locks the node and all child nodes, acquiring lock of type specified for the owner specified.  If only some locks are
+    * acquired, all locks are released and the method returns false.
+    * <p/>
+    * In addition, any locks acquired are added to the context using {@link org.jboss.cache.InvocationContext#addInvocationLockAcquired(NodeLock)}
+    * if you are not running in a transaction, or using {@link org.jboss.cache.transaction.TransactionEntry#addLock(NodeLock)} if you are.
+    * <p/>
+    * The owner for the lock is determined by passing the invocation context to {@link #getLockOwner(org.jboss.cache.InvocationContext)}.
+    * <p/>
+    *
+    * @param fqn      Node to lock
+    * @param lockType type of lock to acquire
+    * @param ctx      invocation context associated with this invocation
+    * @return true if all locks were acquired, false otherwise.
+    */
+   boolean lockAllAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx);
+
+   /**
+    * Releases locks on a given node and all children for a given owner.
+    *
+    * @param node  node to unlock
+    * @param owner lock owner
+    */
+   void unlockAll(NodeSPI node, Object owner);
+
+   /**
+    * Releases locks on a given node and all children for all owners.  Use with care.
+    *
+    * @param node node to unlock
+    */
+   void unlockAll(NodeSPI node);
+
+   /**
+    * Tests whether a given owner owns a lock of lockType on a particular Fqn.
+    *
+    * @param fqn      fqn to test
+    * @param lockType type of lock to test for
+    * @param owner    owner
+    * @return true if the owner does own the specified lock type on the specified node, false otherwise.
+    */
+   boolean ownsLock(Fqn fqn, LockType lockType, Object owner);
+
+   /**
+    * Tests whether a given owner owns any sort of lock on a particular Fqn.
+    *
+    * @param fqn   fqn to test
+    * @param owner owner
+    * @return true if the owner does own the specified lock type on the specified node, false otherwise.
+    */
+   boolean ownsLock(Fqn fqn, Object owner);
+
+   /**
+    * Returns true if the node is locked (either for reading or writing) by anyone, and false otherwise.
+    *
+    * @param n node to inspect
+    * @return true of locked; false if not.
+    */
+   boolean isLocked(NodeSPI n);
+
+   /**
+    * Retrieves the write lock owner, if any, for the current Fqn.
+    *
+    * @param f Fqn to inspect
+    * @return the owner of the lock, or null if not locked.
+    */
+   Object getWriteOwner(Fqn f);
+
+   /**
+    * Retrieves the read lock owners, if any, for the current Fqn.
+    *
+    * @param f Fqn to inspect
+    * @return a collection of read lock owners, or an empty collection if not locked.
+    */
+   Collection<Object> getReadOwners(Fqn f);
+
+   /**
+    * Retrieves the write lock owner, if any, for the current Fqn.
+    *
+    * @param node the node to inspect
+    * @return the owner of the lock, or null if not locked.
+    */
+   Object getWriteOwner(NodeSPI node);
+
+   /**
+    * Retrieves the read lock owners, if any, for the current Fqn.
+    *
+    * @param node the node to inspect
+    * @return a collection of read lock owners, or an empty collection if not locked.
+    */
+   Collection<Object> getReadOwners(NodeSPI node);
+
+   /**
+    * Prints lock information about a node (and it's children) to a String.
+    *
+    * @param node node to inspect
+    */
+   String printLockInfo(NodeSPI node);
+}

Added: core/trunk/src/main/java/org/jboss/cache/lock/LockType.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockType.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockType.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -0,0 +1,12 @@
+package org.jboss.cache.lock;
+
+/**
+ * An enumeration to define different types of locks.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.2.0
+ */
+public enum LockType
+{
+   NONE, READ, WRITE
+}

Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -2,7 +2,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
@@ -21,7 +21,8 @@
       int STATUS_BROKEN = Integer.MIN_VALUE;
    }
 
-   public static boolean breakTransactionLock(NodeLock lock,
+   public static boolean breakTransactionLock(NodeSPI node,
+                                              LockManager lockManager,
                                               GlobalTransaction gtx,
                                               boolean localTx,
                                               TransactionTable tx_table, TransactionManager tm)
@@ -30,9 +31,9 @@
       int tryCount = 0;
       int lastStatus = TransactionLockStatus.STATUS_BROKEN;
 
-      while (!broken && lock.isOwner(gtx))
+      while (!broken && lockManager.ownsLock(node.getFqn(), gtx))
       {
-         int status = breakTransactionLock(gtx, lock, tx_table, tm, localTx, lastStatus, tryCount);
+         int status = breakTransactionLock(gtx, node, lockManager, tx_table, tm, localTx, lastStatus, tryCount);
          if (status == TransactionLockStatus.STATUS_BROKEN)
          {
             broken = true;
@@ -50,78 +51,6 @@
    }
 
    /**
-    * Attempts to acquire a read lock on <code>node</code> for
-    * <code>newOwner</code>, if necessary breaking locks held by
-    * <code>curOwner</code>.
-    *
-    * @param node     the node
-    * @param lock     the lock
-    * @param curOwner the current owner
-    * @param newOwner the new owner
-    */
-   private static boolean acquireLockFromOwner(Node node,
-                                               NodeLock lock,
-                                               Object curOwner,
-                                               Object newOwner,
-                                               TransactionTable tx_table,
-                                               TransactionManager tm,
-                                               Object localAddress)
-   {
-      if (trace)
-      {
-         log.trace("Attempting to acquire lock for node " + node.getFqn() +
-               " from owner " + curOwner);
-      }
-
-      boolean acquired = false;
-      boolean broken = false;
-      int tryCount = 0;
-      int lastStatus = TransactionLockStatus.STATUS_BROKEN;
-
-      while (!broken && !acquired)
-      {
-         if (curOwner instanceof GlobalTransaction)
-         {
-            GlobalTransaction gtx = (GlobalTransaction) curOwner;
-            boolean local = gtx.getAddress().equals(localAddress);
-            int status = breakTransactionLock(gtx, lock, tx_table, tm, local, lastStatus, tryCount);
-            if (status == TransactionLockStatus.STATUS_BROKEN)
-            {
-               broken = true;
-            }
-            else if (status != lastStatus)
-            {
-               tryCount = 0;
-            }
-            lastStatus = status;
-         }
-         else if (tryCount > 0)
-         {
-            lock.release(curOwner);
-            broken = true;
-         }
-
-         if (broken && trace)
-         {
-            log.trace("Broke lock for node " + node.getFqn() +
-                  " held by owner " + curOwner);
-         }
-
-         try
-         {
-            acquired = lock.acquire(newOwner, 1, NodeLock.LockType.READ);
-         }
-         catch (Exception ignore)
-         {
-         }
-
-         tryCount++;
-      }
-
-      return acquired;
-   }
-
-   /**
     * Attempts to release the lock held by <code>gtx</code> by altering the
     * underlying transaction.  Different strategies will be employed
     * depending on the status of the transaction and param
@@ -135,7 +64,6 @@
     * {@link TransactionLockStatus#STATUS_BROKEN}.
     *
     * @param gtx        the gtx holding the lock
-    * @param lock       the lock
     * @param lastStatus the return value from a previous invocation of this
     *                   method for the same lock, or Status.STATUS_UNKNOW
     *                   for the first invocation.
@@ -147,15 +75,15 @@
     *         if the lock held by gtx was forcibly broken.
     */
    private static int breakTransactionLock(GlobalTransaction gtx,
-                                           NodeLock lock,
-                                           TransactionTable tx_table,
+                                           NodeSPI node, LockManager lockManager,
+                                           TransactionTable transactionTable,
                                            TransactionManager tm,
                                            boolean localTx,
                                            int lastStatus,
                                            int tryCount)
    {
       int status = Status.STATUS_UNKNOWN;
-      Transaction tx = tx_table.getLocalTransaction(gtx);
+      Transaction tx = transactionTable.getLocalTransaction(gtx);
       if (tx != null)
       {
          try
@@ -196,7 +124,7 @@
                   {
                      // Something is wrong; our initial rollback call
                      // didn't generate a valid state change; just force it
-                     lock.release(gtx);
+                     lockManager.unlock(node.getFqn(), gtx);
                      status = TransactionLockStatus.STATUS_BROKEN;
                   }
                   break;
@@ -213,7 +141,7 @@
                case Status.STATUS_COMMITTED:
                case Status.STATUS_ROLLEDBACK:
                case Status.STATUS_NO_TRANSACTION:
-                  lock.release(gtx);
+                  lockManager.unlock(node.getFqn(), gtx);
                   status = TransactionLockStatus.STATUS_BROKEN;
                   break;
 
@@ -245,14 +173,14 @@
 
                   // fall through and release
                default:
-                  lock.release(gtx);
+                  lockManager.unlock(node.getFqn(), gtx);
                   status = TransactionLockStatus.STATUS_BROKEN;
             }
          }
          catch (Exception e)
          {
             log.error("Exception breaking locks held by " + gtx, e);
-            lock.release(gtx);
+            lockManager.unlock(node.getFqn(), gtx);
             status = TransactionLockStatus.STATUS_BROKEN;
          }
       }
@@ -260,11 +188,10 @@
       {
          // Race condition; globalTransaction was cleared from txTable.
          // Just double check if globalTransaction still holds a lock
-         if (gtx == lock.getWriterOwner()
-               || lock.getReaderOwners().contains(gtx))
+         if (lockManager.ownsLock(node.getFqn(), gtx))
          {
             // perhaps we should throw an exception?
-            lock.release(gtx);
+            lockManager.unlock(node.getFqn(), gtx);
             status = TransactionLockStatus.STATUS_BROKEN;
          }
       }

Copied: core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java (from rev 5882, core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -0,0 +1,284 @@
+package org.jboss.cache.lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.DataContainerImpl;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+ at SuppressWarnings("deprecation")
+public class NodeBasedLockManager implements LockManager
+{
+   private static final Log log = LogFactory.getLog(NodeBasedLockManager.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   protected Configuration configuration;
+   protected long lockAcquisitionTimeout;
+   protected DataContainerImpl dataContainer;
+   protected NodeSPI rootNode;
+
+   @Inject
+   public void inject(Configuration configuration, DataContainerImpl dataContainer)
+   {
+      this.configuration = configuration;
+      this.dataContainer = dataContainer;
+   }
+
+   @Start
+   public void setRootNode()
+   {
+      this.lockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
+      rootNode = dataContainer.getRoot();
+   }
+
+   /**
+    * Internal method that acquires a lock and returns the lock object.  Currently uses {@link IdentityLock} objects; may change in
+    * future to use standard JDK locks.
+    *
+    * @param fqn      Fqn to lock
+    * @param lockType type of lock to acquire
+    * @param owner    owner to acquire lock for
+    * @param timeout  timeout
+    * @return lock if acquired, null otherwise.
+    */
+   private NodeLock acquireLock(Fqn fqn, LockType lockType, Object owner, long timeout)
+   {
+      return acquireLock(dataContainer.peek(fqn), lockType, owner, timeout);
+   }
+
+   private NodeLock acquireLock(NodeSPI node, LockType lockType, Object owner, long timeout)
+   {
+      NodeLock lock = node.getLock();
+      boolean acquired = false;
+      try
+      {
+         acquired = lock.acquire(owner, timeout, lockType);
+      }
+      catch (InterruptedException e)
+      {
+         // interrupted trying to acquire lock!
+      }
+
+      if (acquired)
+         return lock;
+      else
+         return null;
+   }
+
+   public Object getLockOwner(InvocationContext ctx)
+   {
+      return ctx.getGlobalTransaction() != null ? ctx.getGlobalTransaction() : Thread.currentThread();
+   }
+
+   public boolean lock(Fqn fqn, LockType lockType, Object owner)
+   {
+      return acquireLock(fqn, lockType, owner, lockAcquisitionTimeout) != null;
+   }
+
+   public boolean lock(Fqn fqn, LockType lockType, Object owner, long timeout)
+   {
+      return acquireLock(fqn, lockType, owner, timeout) != null;
+   }
+
+   public boolean lockAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx)
+   {
+      return lockAndRecord(dataContainer.peek(fqn), lockType, ctx);
+   }
+
+   public boolean lockAndRecord(NodeSPI node, LockType lockType, InvocationContext ctx)
+   {
+      NodeLock lock = acquireLock(node, lockType, getLockOwner(ctx), ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout));
+      if (lock != null)
+      {
+         if (ctx.getTransactionEntry() != null)
+         {
+            ctx.getTransactionEntry().addLock(lock);
+         }
+         else
+         {
+            ctx.addInvocationLockAcquired(lock);
+         }
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public void unlock(InvocationContext ctx)
+   {
+      List<NodeLock> locks = ctx.getTransactionEntry() != null ? ctx.getTransactionEntry().getLocks() : ctx.getInvocationLocksAcquired();
+
+      Object owner = getLockOwner(ctx);
+      // Copying out to an array is faster than creating an ArrayList and iterating,
+      // since list creation will just copy out to an array internally
+      IdentityLock[] lockArray = locks.toArray(new IdentityLock[locks.size()]);
+      for (int i = lockArray.length - 1; i >= 0; i--)
+      {
+         if (trace)
+            log.trace("releasing lock for " + lockArray[i].getFqn() + " (" + lockArray[i] + "), owner " + owner);
+         lockArray[i].release(owner);
+      }
+      locks.clear();
+   }
+
+   public void unlock(NodeLock lock, Object owner)
+   {
+      if (trace) log.trace("releasing lock for " + lock.getFqn() + " (" + lock + "), owner " + owner);
+      lock.release(owner);
+   }
+
+   public void unlock(Fqn fqn, Object owner)
+   {
+      unlock(dataContainer.peek(fqn).getLock(), owner);
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner)
+   {
+      return lockAll(node, lockType, owner, lockAcquisitionTimeout, false);
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout)
+   {
+      return lockAll(node, lockType, owner, timeout, false);
+   }
+
+   /**
+    * Locks all nodes, and returns the NodeLocks in a List.  Returns null if the locks could not be acquired.
+    *
+    * @param node                node to lock
+    * @param lockType            type of lock to acquire
+    * @param owner               lock owner
+    * @param timeout             timeout
+    * @param excludeInternalFqns if true, internal Fqns are excluded.
+    * @return list of locks acquired, or null.
+    */
+   private List<NodeLock> lockAllNodes(NodeSPI node, LockType lockType, Object owner, long timeout, boolean excludeInternalFqns)
+   {
+      List<NodeLock> locks = null;
+      try
+      {
+         locks = new ArrayList<NodeLock>(node.getLock().acquireAll(owner, timeout, lockType, excludeInternalFqns));
+      }
+      catch (InterruptedException e)
+      {
+         // interrupted
+      }
+      return locks;
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout, boolean excludeInternalFqns)
+   {
+      return lockAllNodes(node, lockType, owner, timeout, excludeInternalFqns) != null;
+   }
+
+   public boolean lockAllAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx)
+   {
+      return lockAllAndRecord(dataContainer.peek(fqn), lockType, ctx);
+   }
+
+   public boolean lockAllAndRecord(NodeSPI node, LockType lockType, InvocationContext ctx)
+   {
+      List<NodeLock> locks = lockAllNodes(node, lockType, getLockOwner(ctx), ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout), false);
+      if (locks == null) return false;
+
+      if (locks.size() > 0)
+      {
+         if (ctx.getGlobalTransaction() != null)
+         {
+            ctx.getTransactionEntry().addLocks(locks);
+         }
+         else
+         {
+            ctx.addInvocationLocksAcquired(locks);
+         }
+      }
+
+      return true;
+   }
+
+   public void unlockAll(NodeSPI node, Object owner)
+   {
+      // recursively visit node and all children, and release all locks held by a given owner.
+      node.getLock().releaseAll(owner);
+   }
+
+   public void unlockAll(NodeSPI node)
+   {
+      // recursively visit node and all children, and release all locks held by a given owner.
+      node.getLock().releaseAll();
+   }
+
+   public boolean ownsLock(Fqn fqn, LockType lockType, Object owner)
+   {
+      NodeLock lock = dataContainer.peek(fqn).getLock();
+      switch (lockType)
+      {
+         case READ:
+            return lock.isReadLocked() && lock.isOwner(owner);
+         case WRITE:
+            return lock.isWriteLocked() && lock.isOwner(owner);
+         case NONE:
+         default:
+            return false;
+      }
+   }
+
+   public boolean ownsLock(Fqn fqn, Object owner)
+   {
+      NodeSPI node = dataContainer.peek(fqn);
+      return node != null && node.getLock().isOwner(owner);
+   }
+
+   public boolean isLocked(NodeSPI n)
+   {
+      return n.getLock().isLocked();
+   }
+
+   public Object getWriteOwner(Fqn f)
+   {
+      return getWriteOwner(dataContainer.peek(f));
+   }
+
+   public Collection<Object> getReadOwners(Fqn f)
+   {
+      return getReadOwners(dataContainer.peek(f));
+   }
+
+   public Object getWriteOwner(NodeSPI node)
+   {
+      return node.getLock().getWriterOwner();
+   }
+
+   public Collection<Object> getReadOwners(NodeSPI node)
+   {
+      return node.getLock().getReaderOwners();
+   }
+
+   public String printLockInfo(NodeSPI node)
+   {
+      StringBuffer sb = new StringBuffer("\n");
+      int indent = 0;
+
+      for (Object n : node.getChildrenDirect())
+      {
+         ((NodeSPI) n).getLock().printLockInfo(sb, indent);
+         sb.append("\n");
+      }
+
+      return sb.toString();
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/lock/NodeLock.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/NodeLock.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/lock/NodeLock.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -11,10 +11,6 @@
  */
 public interface NodeLock
 {
-   enum LockType
-   {
-      NONE, READ, WRITE
-   }
 
    Fqn getFqn();
 
@@ -97,7 +93,7 @@
     */
    boolean isOwner(Object o);
 
-   boolean acquire(Object caller, long timeout, NodeLock.LockType lock_type) throws LockingException, TimeoutException,
+   boolean acquire(Object caller, long timeout, LockType lock_type) throws LockingException, TimeoutException,
          InterruptedException;
 
    /**
@@ -108,11 +104,11 @@
     * @param lock_type type of lock
     * @return locks acquired
     */
-   Set<NodeLock> acquireAll(Object caller, long timeout, NodeLock.LockType lock_type) throws LockingException, TimeoutException,
+   Set<NodeLock> acquireAll(Object caller, long timeout, LockType lock_type) throws LockingException, TimeoutException,
          InterruptedException;
 
    /**
-    * Same as the overloaded {@link #acquire(Object, long, org.jboss.cache.lock.NodeLock.LockType)} except that you can
+    * Same as the overloaded {@link #acquire(Object, long, LockType)} except that you can
     * optionally specify that internal Fqns - such as buddy backup subtrees - can be excluded when acquiring locks.
     *
     * @param caller              lock owner

Added: core/trunk/src/main/java/org/jboss/cache/lock/PessimisticNodeBasedLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/PessimisticNodeBasedLockManager.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/PessimisticNodeBasedLockManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -0,0 +1,316 @@
+package org.jboss.cache.lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.factories.CommandsFactory;
+import org.jboss.cache.factories.annotations.Inject;
+import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionEntry;
+import org.jboss.cache.transaction.TransactionTable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Contains specific methods for the PessimisticLockInterceptor.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @deprecated will be removed with pessimistic locking
+ */
+ at Deprecated
+ at SuppressWarnings("deprecation")
+public class PessimisticNodeBasedLockManager extends NodeBasedLockManager
+{
+   private static final Log log = LogFactory.getLog(PessimisticNodeBasedLockManager.class);
+   private static final boolean trace = log.isTraceEnabled();
+   private CommandsFactory commandsFactory;
+
+   @Inject
+   private void injectCommandsFactory(CommandsFactory commandsFactory)
+   {
+      this.commandsFactory = commandsFactory;
+   }
+
+   /**
+    * A specific lock method for the PessimisticLockInterceptor.  It should *not* be used anywhere else as it has very
+    * peculiar and specific characteristics.
+    * <p/>
+    * For implementations of this LockManager interface that are not intended for use with the PessimisticLockInterceptor,
+    * it is okay not to implement this method (a no-op).
+    * <p/>
+    *
+    * @param fqn                      Fqn to lock
+    * @param lockType                 Type of lock to acquire
+    * @param ctx                      invocation context
+    * @param createIfNotExists        if true, nodes will be created if they do not exist.
+    * @param zeroLockTimeout          if true uses 0 as a lock acquisition timeout
+    * @param acquireWriteLockOnParent if true, write locks are acquired on parent nodes when child nodes need write locks.
+    * @param reverseRemoveCheck       if true, nodes that have been marked as removed in the current transaction may be reversed.
+    * @param createdNodes             a list to which nodes created in this method may be added.
+    * @param skipNotification         if true, node creation notifications are suppressed.
+    * @return true if successful; false otherwise.
+    * @throws InterruptedException if interrupted
+    */
+   public boolean lockPessimistically(InvocationContext ctx, Fqn fqn, LockType lockType,
+                                      boolean createIfNotExists, boolean zeroLockTimeout, boolean acquireWriteLockOnParent,
+                                      boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification) throws InterruptedException
+   {
+      if (fqn == null || configuration.getIsolationLevel() == IsolationLevel.NONE || ctx.isLockingSuppressed())
+         return false;
+
+      boolean created;
+      long timeout = zeroLockTimeout ? 0 : ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout);
+      // make sure we can bail out of this loop
+      long cutoffTime = System.currentTimeMillis() + timeout;
+      boolean firstTry = true;
+      do
+      {
+         // this is an additional check to make sure we don't try for too long.
+         if (!firstTry && System.currentTimeMillis() > cutoffTime)
+         {
+            throw new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after " + timeout + " millis");
+         }
+
+         created = lock(ctx, fqn, lockType, createIfNotExists, timeout, acquireWriteLockOnParent, reverseRemoveCheck, createdNodes, skipNotification);
+         firstTry = false;
+      }
+      while (createIfNotExists && (dataContainer.peek(fqn, false, false) == null));// keep trying until we have the lock (fixes concurrent remove())
+      return created;
+   }
+
+
+   /**
+    * Acquires locks on the node and on its parrents. Read locks are acquired for exsiting ancestors, with two exceptions:
+    * 1) createIfNotExists is true. If an ancestor is created on the fly, then an WL is acquired by default
+    * 2) acquireWriteLockOnParent is true. If so AND {@link org.jboss.cache.Node#isLockForChildInsertRemove()} then a read
+    * lock will be aquired for the parent of the node.
+    *
+    * @param createIfNotExists  if true, then missing nodes will be cretaed on the fly. If false, method returns if we
+    *                           reach a node that does not exists
+    * @param reverseRemoveCheck if true, will reverse removes if needed.
+    * @param createdNodes       a list to which any nodes created can register their Fqns so that calling code is aware of which nodes have been newly created.
+    * @param skipNotification
+    */
+   private boolean lock(InvocationContext ctx, Fqn fqn, LockType lockType, boolean createIfNotExists, long timeout,
+                        boolean acquireWriteLockOnParent, boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification)
+         throws TimeoutException, LockingException, InterruptedException
+   {
+      Thread currentThread = Thread.currentThread();
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      boolean created = false;
+      // if the tx associated with the current thread is rolling back, barf! JBCACHE-923
+      if (gtx != null) TransactionTable.assertTransactionValid(ctx);
+
+      Object owner = (gtx != null) ? gtx : currentThread;
+      NodeSPI currentNode;
+      if (trace) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
+      long expiryTime = System.currentTimeMillis() + timeout;
+      currentNode = rootNode;
+      NodeSPI parent = null;
+      Object childName = null;
+      int currentIndex = -1;
+      int targetFqnSize = fqn.size();
+
+      do
+      {
+         if (currentNode == null)
+         {
+            if (createIfNotExists)
+            {
+               // if the new node is to be marked as deleted, do not notify!
+               currentNode = parent.addChildDirect(childName, !skipNotification);
+               created = true;
+               if (trace) log.trace("Child node was null, so created child node " + childName);
+               if (createdNodes != null) createdNodes.add(currentNode);
+            }
+            else
+            {
+               if (trace)
+                  log.trace("failed to find or create child " + childName + " of node " + parent);
+               return false;
+            }
+         }
+         else
+         {
+            if (!currentNode.isValid() && createIfNotExists) currentNode.setValid(true, false);
+         }
+
+         LockType lockTypeRequired = LockType.READ;
+         if (created || writeLockNeeded(ctx, lockType, currentIndex, acquireWriteLockOnParent, createIfNotExists, fqn, currentNode))
+         {
+            lockTypeRequired = WRITE;
+         }
+
+         Fqn currentNodeFqn = currentNode.getFqn();
+         // actually acquire the lock we need.  This method blocks.
+         acquireNodeLock(ctx, currentNode, owner, gtx, lockTypeRequired, timeout);
+
+         manageReverseRemove(ctx, currentNode, reverseRemoveCheck, createdNodes);
+         // make sure the lock we acquired isn't on a deleted node/is an orphan!!
+         // look into invalidated nodes as well
+         NodeSPI repeek = dataContainer.peek(currentNodeFqn, true, true);
+         if (currentNode != repeek)
+         {
+            if (trace)
+               log.trace("Was waiting for and obtained a lock on a node that doesn't exist anymore!  Attempting lock acquisition again.");
+            // we have an orphan!! Lose the unnecessary lock and re-acquire the lock (and potentially recreate the node).
+            // check if the parent exists!!
+            // look into invalidated nodes as well
+            currentNode.getLock().releaseAll(owner);
+            if (parent == null || dataContainer.peek(parent.getFqn(), true, true) == null)
+            {
+               // crap!
+               if (trace)
+                  log.trace("Parent has been deleted again.  Go through the lock method all over again.");
+               currentNode = rootNode;
+               currentIndex = -1;
+               parent = null;
+            }
+            else
+            {
+               currentNode = parent;
+               currentIndex--;
+               parent = null;
+               if (System.currentTimeMillis() > expiryTime)
+               {
+                  throw new TimeoutException("Unable to acquire lock on child node " + Fqn.fromRelativeElements(currentNode.getFqn(), childName) + " after " + timeout + " millis.");
+               }
+               if (trace) log.trace("Moving one level up, current node is :" + currentNode);
+            }
+         }
+         else
+         {
+            // we have succeeded in acquiring this lock. Increment the current index since we have gained one level of depth in the tree.
+            currentIndex++;
+
+            // now test if this is the final level and if we can quit the loop:
+            //if (currentNodeFqn.equals(fqn))//we've just processed the last child
+            if (currentIndex == targetFqnSize)
+            {
+               break;
+            }
+            if (!fqn.isChildOrEquals(currentNode.getFqn())) // Does this ever happen?  Perhaps with a move(), I suppose?  - MS
+            {
+               String message = new StringBuffer("currentNode instance changed the FQN(").append(currentNode.getFqn())
+                     .append(") and do not match the FQN on which we want to acquire lock(").append(fqn).append(")").toString();
+               log.trace(message);
+               throw new LockingException(message);
+            }
+            parent = currentNode;
+
+            childName = fqn.get(currentIndex);
+            currentNode = currentNode.getChildDirect(childName);
+         }
+      }
+      while (true);
+      return created;
+   }
+
+   /**
+    * Used by lock()
+    * Determins whter an arbitrary node from the supplied fqn needs an write lock.
+    */
+   private boolean writeLockNeeded(InvocationContext ctx, LockType lockType, int currentNodeIndex, boolean acquireWriteLockOnParent, boolean createIfNotExists, Fqn targetFqn, NodeSPI currentNode)
+   {
+      int treeNodeSize = targetFqn.size();
+      // write lock forced!!
+      boolean isTargetNode = currentNodeIndex == (treeNodeSize - 1);
+      if (isTargetNode && ctx.getOptionOverrides().isForceWriteLock()) return true;
+      //this can be injected, from the caller as a param named wlParent
+      if (currentNode.isLockForChildInsertRemove())
+      {
+         if (acquireWriteLockOnParent && currentNodeIndex == treeNodeSize - 2)
+         {
+            return true;// we're doing a remove and we've reached the PARENT node of the target to be removed.
+         }
+         if (!isTargetNode && dataContainer.peek(targetFqn.getAncestor(currentNodeIndex + 2), false, false) == null)
+         {
+            return createIfNotExists;// we're at a node in the tree, not yet at the target node, and we need to create the next node.  So we need a WL here.
+         }
+      }
+      return lockType == WRITE && isTargetNode;//write lock explicitly requested and this is the target to be written to.
+   }
+
+   private void acquireNodeLock(InvocationContext ctx, NodeSPI node, Object owner, GlobalTransaction gtx, LockType lockType, long lockTimeout) throws LockingException, TimeoutException, InterruptedException
+   {
+      NodeLock lock = node.getLock();
+      boolean acquired = lock.acquire(owner, lockTimeout, lockType);
+      if (acquired)
+      {
+         // Record the lock for release on method return or tx commit/rollback
+         if (gtx != null)
+         {
+            ctx.getTransactionEntry().addLock(lock);
+         }
+         else
+         {
+            ctx.addInvocationLockAcquired(lock);
+         }
+      }
+   }
+
+   /**
+    * Test if this node needs to be 'undeleted'
+    * reverse the "remove" if the node has been previously removed in the same tx, if this operation is a put()
+    */
+   public void manageReverseRemove(InvocationContext ctx, NodeSPI childNode, boolean reverseRemoveCheck, List createdNodes)
+   {
+      if (ctx.getGlobalTransaction() != null) //if no tx then reverse remove does not make sense
+      {
+         Fqn fqn = childNode.getFqn();
+         TransactionEntry entry = ctx.getTransactionEntry();
+         boolean needToReverseRemove = reverseRemoveCheck && childNode.isDeleted() && entry != null && entry.getRemovedNodes().contains(fqn);
+         if (!needToReverseRemove) return;
+         childNode.markAsDeleted(false);
+         //if we'll rollback the tx data should be added to the node again
+         Map oldData = new HashMap(childNode.getDataDirect());
+         PutDataMapCommand command = commandsFactory.buildPutDataMapCommand(ctx.getGlobalTransaction(), fqn, oldData);
+         // txTable.get(gtx).addUndoOperation(command); --- now need to make sure this is added to the normal mods list instead
+         entry.addModification(command);
+         //we're prepared for rollback, now reset the node
+         childNode.clearDataDirect();
+         if (createdNodes != null)
+         {
+            createdNodes.add(childNode);
+         }
+      }
+   }
+
+   /**
+    * Acquires write locks on the node and all child nodes, adding children to the list of removed nodes in the context.
+    *
+    * @param node  node to inspect
+    * @param ctx   invocation context
+    * @param entry transaction entry
+    * @throws InterruptedException in the event of interruption
+    */
+   public void lockAllForRemoval(NodeSPI node, InvocationContext ctx, TransactionEntry entry) throws InterruptedException
+   {
+      if (node == null) return;
+
+      long timeout = ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout);
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      Object owner = (gtx != null) ? gtx : Thread.currentThread();
+
+      Set<NodeLock> acquiredLocks = node.getLock().acquireAll(owner, timeout, WRITE);
+      if (acquiredLocks.size() > 0)
+      {
+         if (gtx != null)
+         {
+            ctx.getTransactionEntry().addLocks(acquiredLocks);
+            for (NodeLock l : acquiredLocks) entry.addRemovedNode(l.getFqn());
+         }
+         else
+         {
+            ctx.addInvocationLocksAcquired(acquiredLocks);
+         }
+      }
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -17,7 +17,8 @@
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.NonVolatile;
 import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.lock.LockManager;
+import static org.jboss.cache.lock.LockType.READ;
 import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.NodeData;
@@ -39,18 +40,20 @@
    private Marshaller marshaller;
    private RegionManager regionManager;
    private Configuration configuration;
+   private LockManager lockManager;
 
    public StateTransferManager()
    {
    }
 
    @Inject
-   public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration)
+   public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration, LockManager lockManager)
    {
       this.cache = cache;
       this.regionManager = regionManager;
       this.marshaller = marshaller;
       this.configuration = configuration;
+      this.lockManager = lockManager;
    }
 
    public StateTransferManager(CacheSPI cache)
@@ -251,11 +254,11 @@
       {
          if (lockChildren)
          {
-            root.getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ, true);
+            lockManager.lockAll(root, READ, lockOwner, timeout, true);
          }
          else
          {
-            root.getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ);
+            lockManager.lock(Fqn.ROOT, READ, lockOwner, timeout);
          }
       }
       catch (TimeoutException te)
@@ -289,11 +292,11 @@
       {
          if (childrenLocked)
          {
-            root.getLock().releaseAll(lockOwner);
+            lockManager.unlockAll(root, lockOwner);
          }
          else
          {
-            root.getLock().release(lockOwner);
+            lockManager.unlock(Fqn.ROOT, lockOwner);
          }
       }
       catch (Throwable t)

Modified: core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -75,7 +75,7 @@
       Configuration config = new Configuration();
       config.setNodeLockingOptimistic(false);
       DataVersion dataVersion = new DefaultDataVersion(2);
-      container.setDependencies(config, null);
+      container.setDependencies(config, null, null);
       assert nodes.adfgNode == container.peekVersioned(nodes.adfg, dataVersion, true) : "if NOT opt locking same value as peek(boolean, boolean) expected";
 
       //test optimistic locking with same version
@@ -89,7 +89,8 @@
       {
          container.peekVersioned(nodes.adfg, new DefaultDataVersion(1), true);
          assert false : "exception expected as version changed.";
-      } catch (CacheException e)
+      }
+      catch (CacheException e)
       {
          //expected
       }
@@ -106,7 +107,8 @@
       {
          container.peekStrict(null, nodes.notExistent, true);
          assert false : "excpetion expected as node does not exist";
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
          //expected
       }
@@ -289,16 +291,16 @@
    }
 
    /**
-    *  test {@link DataContainerImpl#createNodes(Fqn)}
+    * test {@link DataContainerImpl#createNodes(Fqn)}
     */
    public void testCreateNodes()
    {
       Object[] objects = container.createNodes(Fqn.fromString("/a/x/y/z"));
       List result = (List) objects[0];
       assert result.size() == 3;
-      assert ((NodeSPI)result.get(0)).getFqn().equals(Fqn.fromString("/a/x"));
-      assert ((NodeSPI)result.get(1)).getFqn().equals(Fqn.fromString("/a/x/y"));
-      assert ((NodeSPI)result.get(2)).getFqn().equals(Fqn.fromString("/a/x/y/z"));
+      assert ((NodeSPI) result.get(0)).getFqn().equals(Fqn.fromString("/a/x"));
+      assert ((NodeSPI) result.get(1)).getFqn().equals(Fqn.fromString("/a/x/y"));
+      assert ((NodeSPI) result.get(2)).getFqn().equals(Fqn.fromString("/a/x/y/z"));
       NodeSPI target = (NodeSPI) objects[1];
       assert target != null;
       assert target.getFqn().toString().equals("/a/x/y/z");

Modified: core/trunk/src/test/java/org/jboss/cache/lock/AcquireAllTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/lock/AcquireAllTest.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/test/java/org/jboss/cache/lock/AcquireAllTest.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -42,12 +42,12 @@
       root = cache.getRoot();
       NodeLock lock = root.getLock();
 
-      lock.acquireAll(owner, 2000, NodeLock.LockType.READ);
+      lock.acquireAll(owner, 2000, LockType.READ);
       lock.releaseAll(owner);
 
       assertEquals(0, cache.getNumberOfLocksHeld());
 
-      lock.acquireAll(owner, 2000, NodeLock.LockType.WRITE);
+      lock.acquireAll(owner, 2000, LockType.WRITE);
       lock.releaseAll(owner);
 
       assertEquals(0, cache.getNumberOfLocksHeld());
@@ -67,12 +67,12 @@
       root = cache.getRoot();
       NodeLock lock = root.getLock();
 
-      lock.acquireAll(owner, 2000, NodeLock.LockType.READ);
+      lock.acquireAll(owner, 2000, LockType.READ);
       lock.releaseAll(owner);
 
       assertEquals(0, cache.getNumberOfLocksHeld());
 
-      lock.acquireAll(owner, 2000, NodeLock.LockType.WRITE);
+      lock.acquireAll(owner, 2000, LockType.WRITE);
       lock.releaseAll(owner);
 
       assertEquals(0, cache.getNumberOfLocksHeld());

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/OptimisticLockInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/OptimisticLockInterceptorTest.java	2008-05-23 16:41:43 UTC (rev 5886)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/OptimisticLockInterceptorTest.java	2008-05-23 17:15:52 UTC (rev 5887)
@@ -7,9 +7,10 @@
 import org.jboss.cache.commands.VisitableCommand;
 import org.jboss.cache.interceptors.OptimisticInterceptor;
 import org.jboss.cache.interceptors.OptimisticLockingInterceptor;
+import org.jboss.cache.lock.LockType;
+import static org.jboss.cache.lock.LockType.READ;
+import static org.jboss.cache.lock.LockType.WRITE;
 import org.jboss.cache.lock.NodeLock;
-import static org.jboss.cache.lock.NodeLock.LockType.READ;
-import static org.jboss.cache.lock.NodeLock.LockType.WRITE;
 import org.jboss.cache.misc.TestingUtil;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterMethod;
@@ -191,8 +192,8 @@
 
 class LockReportInterceptor extends OptimisticInterceptor
 {
-   private Map<Fqn, NodeLock.LockType> expected = new HashMap<Fqn, NodeLock.LockType>();
-   private Map<Fqn, NodeLock.LockType> actual = new HashMap<Fqn, NodeLock.LockType>();
+   private Map<Fqn, LockType> expected = new HashMap<Fqn, LockType>();
+   private Map<Fqn, LockType> actual = new HashMap<Fqn, LockType>();
 
    void reset()
    {




More information about the jbosscache-commits mailing list