Author: manik.surtani(a)jboss.com
Date: 2008-06-25 09:21:38 -0400 (Wed, 25 Jun 2008)
New Revision: 6027
Added:
core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
Log:
First-cut implementation of the mvcc lock manager
Added: core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java 2008-06-25 13:21:38
UTC (rev 6027)
@@ -0,0 +1,378 @@
+package org.jboss.cache.lock;
+
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
+
+import javax.transaction.TransactionManager;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This lock manager acquires and releases locks based on the Fqn passed in and not on
the node itself. The main benefit
+ * here is that locks can be acquired and held even for nonexistent nodes.
+ * <p/>
+ * Uses lock striping so that a fixed number of locks are maintained for the entire
cache, and not a single lock per node.
+ * <p/>
+ * This implementation only acquires exclusive WRITE locks, and throws an exception if
you attempt to use it to acquire a
+ * READ lock. JBoss Cache's MVCC design doesn't use read locks at all.
+ * <p/>
+ * The concept of lock owners is implicit in this implementation and any owners passed in
as parameters (where required)
+ * will be ignored. See {@link
org.jboss.cache.util.concurrent.locks.OwnableReentrantLock} for details on how ownership
+ * is determined.
+ * <p/>
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
+ * @since 3.0
+ */
+public class MVCCLockManager extends FqnLockManager
+{
+ LockContainer lockContainer;
+ DataContainer dataContainer;
+ private Set<Fqn> internalFqns;
+ private CacheSPI cache;
+ private TransactionManager transactionManager;
+ private InvocationContextContainer invocationContextContainer;
+ private static final int DEFAULT_CONCURRENCY = 20;
+
+ @Inject
+ public void injectDataContainer(DataContainer dataContainer, CacheSPI cache,
TransactionManager transactionManager, InvocationContextContainer
invocationContextContainer)
+ {
+ this.dataContainer = dataContainer;
+ this.cache = cache;
+ this.transactionManager = transactionManager;
+ this.invocationContextContainer = invocationContextContainer;
+ }
+
+ @Start
+ public void startLockManager()
+ {
+ lockContainer = transactionManager == null ? new ReentrantLockContainer() : new
OwnableReentrantLockContainer();
+ }
+
+ @Start
+ @SuppressWarnings("unchecked")
+ public void setInternalFqns()
+ {
+ internalFqns = cache.getInternalFqns();
+ }
+
+
+ protected void assertIsWriteLock(LockType lockType)
+ {
+ if (lockType != LockType.WRITE)
+ throw new UnsupportedOperationException(getClass().getName() + " only
supports write locks.");
+ }
+
+ public boolean lock(Fqn fqn, LockType lockType, Object owner) throws
InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ Lock lock = lockContainer.getLock(fqn);
+ return lock.tryLock(lockAcquisitionTimeout, MILLISECONDS);
+ }
+
+ public boolean lock(Fqn fqn, LockType lockType, Object owner, long timeoutMillis)
throws InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ Lock lock = lockContainer.getLock(fqn);
+ return lock.tryLock(timeoutMillis, MILLISECONDS);
+ }
+
+ public boolean lockAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx) throws
InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ Lock lock = lockContainer.getLock(fqn);
+ if (lock.tryLock(ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout),
MILLISECONDS))
+ {
+ ctx.addLock(fqn);
+ return true;
+ }
+
+ // couldn't acquire lock!
+ return false;
+ }
+
+ public void unlock(Fqn fqn, Object owner)
+ {
+ Lock lock = lockContainer.getLock(fqn);
+ lock.unlock();
+ }
+
+ public void unlock(InvocationContext ctx)
+ {
+ List<Fqn> locks = ctx.getLocks();
+ if (!locks.isEmpty())
+ {
+ // unlocking needs to be done in reverse order.
+ Fqn[] fqns = new Fqn[locks.size()];
+ fqns = locks.toArray(fqns);
+ for (int i = fqns.length - 1; i > -1; i--)
lockContainer.getLock(fqns[i]).unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected boolean lockRecursively(Node node, long timeoutMillis, boolean
excludeInternalFqns, InvocationContext ctx) throws InterruptedException
+ {
+ if (excludeInternalFqns && internalFqns.contains(node.getFqn()))
+ return true; // this will stop the recursion from proceeding down this subtree.
+
+ boolean locked = ctx == null ?
+ lock(node.getFqn(), LockType.WRITE, null, timeoutMillis) :
+ lockAndRecord(node.getFqn(), LockType.WRITE, ctx);
+
+ if (!locked) return false;
+ boolean needToUnlock = false;
+
+ // need to recursively walk through the node's children and acquire locks.
This needs to happen using API methods
+ // since any cache loading will need to happen.
+ Set<Node> children = node.getChildren();
+ try
+ {
+ if (children != null)
+ {
+ for (Node child : children)
+ {
+ locked = lockRecursively(child, timeoutMillis, excludeInternalFqns, ctx);
+ if (!locked)
+ {
+ needToUnlock = true;
+ break;
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (needToUnlock)
+ {
+ for (Node child : children)
+ {
+ Fqn childFqn = child.getFqn();
+ unlock(childFqn, null);
+ if (ctx != null) ctx.removeLock(childFqn);
+ }
+ unlock(node.getFqn(), null);
+ if (ctx != null) ctx.removeLock(node.getFqn());
+ }
+ }
+ return locked;
+ }
+
+ public boolean lockAll(NodeSPI node, LockType lockType, Object owner) throws
InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ return lockRecursively(node, lockAcquisitionTimeout, false, null);
+ }
+
+ public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout)
throws InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ return lockRecursively(node, timeout, false, null);
+ }
+
+ public boolean lockAll(NodeSPI node, LockType lockType, Object owner, long timeout,
boolean excludeInternalFqns) throws InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ return lockRecursively(node, timeout, excludeInternalFqns, null);
+ }
+
+ public boolean lockAllAndRecord(NodeSPI node, LockType lockType, InvocationContext
ctx) throws InterruptedException
+ {
+ assertIsWriteLock(lockType);
+ return lockRecursively(node,
ctx.getContextLockAcquisitionTimeout(lockAcquisitionTimeout), false, ctx);
+ }
+
+ public boolean lockAllAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx)
throws InterruptedException
+ {
+ return lockAllAndRecord(dataContainer.peek(fqn, false), lockType, ctx);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void unlockAll(NodeSPI node, Object owner)
+ {
+ // depth first.
+ Set<Node> children = node.getChildren();
+ if (children != null)
+ {
+ for (Node child : children) unlockAll((NodeSPI) child, null);
+ }
+ unlock(node.getFqn(), null);
+ }
+
+ public void unlockAll(NodeSPI node)
+ {
+ throw new UnsupportedOperationException("Not supported in this impl.");
+ }
+
+ public boolean ownsLock(Fqn fqn, LockType lockType, Object owner)
+ {
+ assertIsWriteLock(lockType);
+ return lockContainer.ownsLock(fqn, owner);
+ }
+
+ public boolean ownsLock(Fqn fqn, Object owner)
+ {
+ return lockContainer.ownsLock(fqn, owner);
+ }
+
+ public boolean isLocked(NodeSPI n)
+ {
+ return lockContainer.isLocked(n.getFqn());
+ }
+
+ public boolean isLocked(NodeSPI n, LockType lockType)
+ {
+ assertIsWriteLock(lockType);
+ return lockContainer.isLocked(n.getFqn());
+ }
+
+ public Object getWriteOwner(Fqn f)
+ {
+ throw new UnsupportedOperationException("Not supported in this impl.");
+ }
+
+ public Collection<Object> getReadOwners(Fqn f)
+ {
+ throw new UnsupportedOperationException("Not supported in this impl.");
+ }
+
+ public String printLockInfo(NodeSPI node)
+ {
+ return "Not supported in this impl.";
+ }
+
+ @ThreadSafe
+ abstract class LockContainer
+ {
+ private final int lockSegmentMask;
+ private final int lockSegmentShift;
+
+ /**
+ * This constructor just calls {@link #LockContainer(int)} with a default
concurrency value of 20.
+ */
+ LockContainer()
+ {
+ this(DEFAULT_CONCURRENCY);
+ }
+
+ /**
+ * Creates a new LockContainer which uses a certain number of shared locks across
all elements that need to be locked.
+ *
+ * @param concurrency number of threads expected to use this class concurrently.
+ */
+ LockContainer(int concurrency)
+ {
+ int tempLockSegShift = 0;
+ int numLocks = 1;
+ while (numLocks < concurrency)
+ {
+ ++tempLockSegShift;
+ numLocks <<= 1;
+ }
+ lockSegmentShift = 32 - tempLockSegShift;
+ lockSegmentMask = numLocks - 1;
+
+ initLocks(numLocks);
+ }
+
+ int hashToIndex(Fqn fqn)
+ {
+ return (hash(fqn) >>> lockSegmentShift) & lockSegmentMask;
+ }
+
+ /**
+ * Returns a hash code for non-null Object x.
+ * Uses the same hash code spreader as most other java.util hash tables, except
that this uses the string representation
+ * of the object passed in.
+ *
+ * @param x the object serving as a key
+ * @return the hash code
+ */
+ int hash(Object x)
+ {
+ int h = x.toString().hashCode();
+ h += ~(h << 9);
+ h ^= (h >>> 14);
+ h += (h << 4);
+ h ^= (h >>> 10);
+ return h;
+ }
+
+ abstract void initLocks(int numLocks);
+
+ abstract boolean ownsLock(Fqn fqn, Object owner);
+
+ abstract boolean isLocked(Fqn fqn);
+
+ abstract Lock getLock(Fqn fqn);
+ }
+
+ class ReentrantLockContainer extends LockContainer
+ {
+ ReentrantLock[] sharedLocks;
+
+ void initLocks(int numLocks)
+ {
+ sharedLocks = new ReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
+ }
+
+ ReentrantLock getLock(Fqn fqn)
+ {
+ return sharedLocks[hashToIndex(fqn)];
+ }
+
+ boolean ownsLock(Fqn fqn, Object owner)
+ {
+ ReentrantLock lock = getLock(fqn);
+ return lock.isHeldByCurrentThread();
+ }
+
+ boolean isLocked(Fqn fqn)
+ {
+ ReentrantLock lock = getLock(fqn);
+ return lock.isLocked();
+ }
+ }
+
+ class OwnableReentrantLockContainer extends LockContainer
+ {
+ OwnableReentrantLock[] sharedLocks;
+
+ void initLocks(int numLocks)
+ {
+ sharedLocks = new OwnableReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new
OwnableReentrantLock(invocationContextContainer);
+ }
+
+ OwnableReentrantLock getLock(Fqn fqn)
+ {
+ return sharedLocks[hashToIndex(fqn)];
+ }
+
+ boolean ownsLock(Fqn fqn, Object owner)
+ {
+ OwnableReentrantLock lock = getLock(fqn);
+ return owner.equals(lock.getOwner());
+ }
+
+ boolean isLocked(Fqn fqn)
+ {
+ OwnableReentrantLock lock = getLock(fqn);
+ return lock.isLocked();
+ }
+ }
+}