[jbosscache-commits] JBoss Cache SVN: r6027 - core/trunk/src/main/java/org/jboss/cache/lock.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jun 25 09:21:39 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-06-25 09:21:38 -0400 (Wed, 25 Jun 2008)
New Revision: 6027

Added:
   core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
Log:
First-cut implementation of the mvcc lock manager

Added: core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java	2008-06-25 13:21:38 UTC (rev 6027)
@@ -0,0 +1,378 @@
+package org.jboss.cache.lock;
+
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
+
+import javax.transaction.TransactionManager;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This lock manager acquires and releases locks based on the Fqn passed in and not on the node itself.  The main benefit
+ * here is that locks can be acquired and held even for nonexistent nodes.
+ * <p/>
+ * Uses lock striping so that a fixed number of locks are maintained for the entire cache, and not a single lock per node.
+ * <p/>
+ * This implementation only acquires exclusive WRITE locks, and throws an exception if you attempt to use it to acquire a
+ * READ lock.  JBoss Cache's MVCC design doesn't use read locks at all.
+ * <p/>
+ * The concept of lock owners is implicit in this implementation and any owners passed in as parameters (where required)
+ * will be ignored.  See {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock} for details on how ownership
+ * is determined.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
+ * @since 3.0
+ */
+public class MVCCLockManager extends FqnLockManager
+{
+   LockContainer lockContainer;
+   DataContainer dataContainer;
+   private Set<Fqn> internalFqns;
+   private CacheSPI cache;
+   private TransactionManager transactionManager;
+   private InvocationContextContainer invocationContextContainer;
+   private static final int DEFAULT_CONCURRENCY = 20;
+
+   @Inject
+   public void injectDataContainer(DataContainer dataContainer, CacheSPI cache, TransactionManager transactionManager, InvocationContextContainer invocationContextContainer)
+   {
+      this.dataContainer = dataContainer;
+      this.cache = cache;
+      this.transactionManager = transactionManager;
+      this.invocationContextContainer = invocationContextContainer;
+   }
+
+   @Start
+   public void startLockManager()
+   {
+      lockContainer = transactionManager == null ? new ReentrantLockContainer() : new OwnableReentrantLockContainer();
+   }
+
+   @Start
+   @SuppressWarnings("unchecked")
+   public void setInternalFqns()
+   {
+      internalFqns = cache.getInternalFqns();
+   }
+
+
+   protected void assertIsWriteLock(LockType lockType)
+   {
+      if (lockType != LockType.WRITE)
+         throw new UnsupportedOperationException(getClass().getName() + " only supports write locks.");
+   }
+
+   public boolean lock(Fqn fqn, LockType lockType, Object owner) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      Lock lock = lockContainer.getLock(fqn);
+      return lock.tryLock(lockAcquisitionTimeout, MILLISECONDS);
+   }
+
+   public boolean lock(Fqn fqn, LockType lockType, Object owner, long timeoutMillis) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      Lock lock = lockContainer.getLock(fqn);
+      return lock.tryLock(timeoutMillis, MILLISECONDS);
+   }
+
+   public boolean lockAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      Lock lock = lockContainer.getLock(fqn);
+      if (lock.tryLock(ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout), MILLISECONDS))
+      {
+         ctx.addLock(fqn);
+         return true;
+      }
+
+      // couldn't acquire lock!
+      return false;
+   }
+
+   public void unlock(Fqn fqn, Object owner)
+   {
+      Lock lock = lockContainer.getLock(fqn);
+      lock.unlock();
+   }
+
+   public void unlock(InvocationContext ctx)
+   {
+      List<Fqn> locks = ctx.getLocks();
+      if (!locks.isEmpty())
+      {
+         // unlocking needs to be done in reverse order.
+         Fqn[] fqns = new Fqn[locks.size()];
+         fqns = locks.toArray(fqns);
+         for (int i = fqns.length - 1; i > -1; i--) lockContainer.getLock(fqns[i]).unlock();
+      }
+   }
+
+   @SuppressWarnings("unchecked")
+   protected boolean lockRecursively(Node node, long timeoutMillis, boolean excludeInternalFqns, InvocationContext ctx) throws InterruptedException
+   {
+      if (excludeInternalFqns && internalFqns.contains(node.getFqn()))
+         return true; // this will stop the recursion from proceeding down this subtree.
+
+      boolean locked = ctx == null ?
+            lock(node.getFqn(), LockType.WRITE, null, timeoutMillis) :
+            lockAndRecord(node.getFqn(), LockType.WRITE, ctx);
+
+      if (!locked) return false;
+      boolean needToUnlock = false;
+
+      // need to recursively walk through the node's children and acquire locks.  This needs to happen using API methods
+      // since any cache loading will need to happen.
+      Set<Node> children = node.getChildren();
+      try
+      {
+         if (children != null)
+         {
+            for (Node child : children)
+            {
+               locked = lockRecursively(child, timeoutMillis, excludeInternalFqns, ctx);
+               if (!locked)
+               {
+                  needToUnlock = true;
+                  break;
+               }
+            }
+         }
+      }
+      finally
+      {
+         if (needToUnlock)
+         {
+            for (Node child : children)
+            {
+               Fqn childFqn = child.getFqn();
+               unlock(childFqn, null);
+               if (ctx != null) ctx.removeLock(childFqn);
+            }
+            unlock(node.getFqn(), null);
+            if (ctx != null) ctx.removeLock(node.getFqn());
+         }
+      }
+      return locked;
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      return lockRecursively(node, lockAcquisitionTimeout, false, null);
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      return lockRecursively(node, timeout, false, null);
+   }
+
+   public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout, boolean excludeInternalFqns) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      return lockRecursively(node, timeout, excludeInternalFqns, null);
+   }
+
+   public boolean lockAllAndRecord(NodeSPI node, LockType lockType, InvocationContext ctx) throws InterruptedException
+   {
+      assertIsWriteLock(lockType);
+      return lockRecursively(node, ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout), false, ctx);
+   }
+
+   public boolean lockAllAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx) throws InterruptedException
+   {
+      return lockAllAndRecord(dataContainer.peek(fqn, false), lockType, ctx);
+   }
+
+   @SuppressWarnings("unchecked")
+   public void unlockAll(NodeSPI node, Object owner)
+   {
+      // depth first.
+      Set<Node> children = node.getChildren();
+      if (children != null)
+      {
+         for (Node child : children) unlockAll((NodeSPI) child, null);
+      }
+      unlock(node.getFqn(), null);
+   }
+
+   public void unlockAll(NodeSPI node)
+   {
+      throw new UnsupportedOperationException("Not supported in this impl.");
+   }
+
+   public boolean ownsLock(Fqn fqn, LockType lockType, Object owner)
+   {
+      assertIsWriteLock(lockType);
+      return lockContainer.ownsLock(fqn, owner);
+   }
+
+   public boolean ownsLock(Fqn fqn, Object owner)
+   {
+      return lockContainer.ownsLock(fqn, owner);
+   }
+
+   public boolean isLocked(NodeSPI n)
+   {
+      return lockContainer.isLocked(n.getFqn());
+   }
+
+   public boolean isLocked(NodeSPI n, LockType lockType)
+   {
+      assertIsWriteLock(lockType);
+      return lockContainer.isLocked(n.getFqn());
+   }
+
+   public Object getWriteOwner(Fqn f)
+   {
+      throw new UnsupportedOperationException("Not supported in this impl.");
+   }
+
+   public Collection<Object> getReadOwners(Fqn f)
+   {
+      throw new UnsupportedOperationException("Not supported in this impl.");
+   }
+
+   public String printLockInfo(NodeSPI node)
+   {
+      return "Not supported in this impl.";
+   }
+
+   @ThreadSafe
+   abstract class LockContainer
+   {
+      private final int lockSegmentMask;
+      private final int lockSegmentShift;
+
+      /**
+       * This constructor just calls {@link #LockContainer(int)} with a default concurrency value of 20.
+       */
+      LockContainer()
+      {
+         this(DEFAULT_CONCURRENCY);
+      }
+
+      /**
+       * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
+       *
+       * @param concurrency number of threads expected to use this class concurrently.
+       */
+      LockContainer(int concurrency)
+      {
+         int tempLockSegShift = 0;
+         int numLocks = 1;
+         while (numLocks < concurrency)
+         {
+            ++tempLockSegShift;
+            numLocks <<= 1;
+         }
+         lockSegmentShift = 32 - tempLockSegShift;
+         lockSegmentMask = numLocks - 1;
+
+         initLocks(numLocks);
+      }
+
+      int hashToIndex(Fqn fqn)
+      {
+         return (hash(fqn) >>> lockSegmentShift) & lockSegmentMask;
+      }
+
+      /**
+       * Returns a hash code for non-null Object x.
+       * Uses the same hash code spreader as most other java.util hash tables, except that this uses the string representation
+       * of the object passed in.
+       *
+       * @param x the object serving as a key
+       * @return the hash code
+       */
+      int hash(Object x)
+      {
+         int h = x.toString().hashCode();
+         h += ~(h << 9);
+         h ^= (h >>> 14);
+         h += (h << 4);
+         h ^= (h >>> 10);
+         return h;
+      }
+
+      abstract void initLocks(int numLocks);
+
+      abstract boolean ownsLock(Fqn fqn, Object owner);
+
+      abstract boolean isLocked(Fqn fqn);
+
+      abstract Lock getLock(Fqn fqn);
+   }
+
+   class ReentrantLockContainer extends LockContainer
+   {
+      ReentrantLock[] sharedLocks;
+
+      void initLocks(int numLocks)
+      {
+         sharedLocks = new ReentrantLock[numLocks];
+         for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
+      }
+
+      ReentrantLock getLock(Fqn fqn)
+      {
+         return sharedLocks[hashToIndex(fqn)];
+      }
+
+      boolean ownsLock(Fqn fqn, Object owner)
+      {
+         ReentrantLock lock = getLock(fqn);
+         return lock.isHeldByCurrentThread();
+      }
+
+      boolean isLocked(Fqn fqn)
+      {
+         ReentrantLock lock = getLock(fqn);
+         return lock.isLocked();
+      }
+   }
+
+   class OwnableReentrantLockContainer extends LockContainer
+   {
+      OwnableReentrantLock[] sharedLocks;
+
+      void initLocks(int numLocks)
+      {
+         sharedLocks = new OwnableReentrantLock[numLocks];
+         for (int i = 0; i < numLocks; i++) sharedLocks[i] = new OwnableReentrantLock(invocationContextContainer);
+      }
+
+      OwnableReentrantLock getLock(Fqn fqn)
+      {
+         return sharedLocks[hashToIndex(fqn)];
+      }
+
+      boolean ownsLock(Fqn fqn, Object owner)
+      {
+         OwnableReentrantLock lock = getLock(fqn);
+         return owner.equals(lock.getOwner());
+      }
+
+      boolean isLocked(Fqn fqn)
+      {
+         OwnableReentrantLock lock = getLock(fqn);
+         return lock.isLocked();
+      }
+   }
+}




More information about the jbosscache-commits mailing list