[jboss-cvs] JBossAS SVN: r104321 - in projects/cluster/ha-server-api/trunk/src: main/java/org/jboss/ha/framework/server/lock and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 29 12:36:37 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-04-29 12:36:37 -0400 (Thu, 29 Apr 2010)
New Revision: 104321
Added:
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java
projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/
Modified:
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
Log:
[JBCLUSTER-223] Bring AS cluster module distributed locking into ha-server-api
Copied: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock (from rev 104319, trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock)
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-04-29 15:11:25 UTC (rev 104319)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-04-29 16:36:37 UTC (rev 104321)
@@ -154,29 +154,23 @@
{
// Get the lock on all other nodes in the cluster
- @SuppressWarnings("unchecked")
- ArrayList rsps = partition.callMethodOnCluster(getServiceHAName(),
+ List<RemoteLockResponse> rsps = partition.callMethodOnCluster(getServiceHAName(),
"remoteLock", new Object[]{lockId, me, new Long(left)},
- REMOTE_LOCK_TYPES, true);
+ REMOTE_LOCK_TYPES, RemoteLockResponse.class, true, null, partition.getMethodCallTimeout(), false);
boolean remoteLocked = true;
if (rsps != null)
{
- for (Object rsp : rsps)
+ for (RemoteLockResponse rsp : rsps)
{
- if ((rsp instanceof RemoteLockResponse) == false)
+ if (rsp.flag != RemoteLockResponse.Flag.OK)
{
remoteLocked = false;
- }
- else if (((RemoteLockResponse) rsp).flag != RemoteLockResponse.Flag.OK)
- {
- RemoteLockResponse curRsp = (RemoteLockResponse) rsp;
- remoteLocked = false;
if (superiorCompetitor == null)
{
- superiorCompetitor = getSuperiorCompetitor(curRsp.holder);
- log.debug("Received " + curRsp.flag + " response from " +
- curRsp.responder + " -- reports lock is held by " + curRsp.holder);
+ superiorCompetitor = getSuperiorCompetitor(rsp.holder);
+ log.debug("Received " + rsp.flag + " response from " +
+ rsp.responder + " -- reports lock is held by " + rsp.holder);
}
}
}
Added: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java (rev 0)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java 2010-04-29 16:36:37 UTC (rev 104321)
@@ -0,0 +1,330 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+
+/**
+ * @author Brian Stansberry
+ *
+ * @version $Revision:$
+ */
+public class YieldingClusterLockManager
+{
+ /** Result of a {@link YieldingClusterLockManager#lock(Serializable, long, boolean) lock call}*/
+ public static enum LockResult
+ {
+ /** Indicates the lock was acquired after requesting it from the cluster */
+ ACQUIRED_FROM_CLUSTER,
+ /** Indicates this node already held the lock */
+ ALREADY_HELD,
+ /**
+ * Indicates the 'newLock' param passed to
+ * {@link YieldingClusterLockManager#lock(Serializable, long, boolean)}
+ * was <code>true</code> and the local node in fact was unaware of the
+ * lock. If in fact the local node was already aware of the lock (which
+ * would generally indicate a flaw in the application using this class)
+ * NEW_LOCK will not be returned; rather one of the other enum values
+ * will be returned.
+ */
+ NEW_LOCK
+ }
+
+ private class LocalLock
+ {
+ private volatile ClusterNode holder;
+ private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
+ private final AtomicInteger localLockCount = new AtomicInteger();
+
+ private void lock(ClusterNode caller, long timeout) throws TimeoutException
+ {
+ if (YieldingClusterLockManager.this.localNode.equals(caller))
+ {
+ this.localLockCount.incrementAndGet();
+ this.holder = YieldingClusterLockManager.this.localNode;
+ }
+ else
+ {
+ long deadline = System.currentTimeMillis() + timeout;
+ boolean wasInterrupted = false;
+ Thread current = Thread.currentThread();
+ waiters.add(current);
+
+ try
+ {
+ // Block while not first in queue or cannot acquire lock
+ while (waiters.peek() != current ||
+ localLockCount.get() > 0)
+ {
+ LockSupport.parkUntil(deadline);
+ if (Thread.interrupted()) // ignore interrupts while waiting
+ wasInterrupted = true;
+ if (System.currentTimeMillis() >= deadline)
+ {
+ if (waiters.peek() != current ||
+ localLockCount.get() > 0)
+ {
+ throw new TimeoutException(this.holder);
+ }
+ break;
+ }
+ }
+
+ if (localLockCount.get() == 0)
+ {
+ holder = caller;
+ }
+ else
+ {
+ throw new TimeoutException(this.holder);
+ }
+ }
+ finally
+ {
+ waiters.remove();
+ if (wasInterrupted) // reassert interrupt status on exit
+ current.interrupt();
+ }
+ }
+ }
+
+ private void unlock(ClusterNode caller)
+ {
+ if (caller.equals(holder))
+ {
+ if (YieldingClusterLockManager.this.localNode.equals(caller))
+ {
+ if (this.localLockCount.decrementAndGet() == 0)
+ {
+ holder = null;
+ }
+ }
+ else
+ {
+ holder = null;
+ }
+
+ if (holder == null)
+ {
+ LockSupport.unpark(waiters.peek());
+ }
+ }
+ }
+
+ }
+
+ /** Handles callbacks from the cluster lock support object */
+ private class ClusterHandler implements LocalLockHandler
+ {
+ // ----------------------------------------------------- LocalLockHandler
+
+ public ClusterNode getLocalNode(ClusterNode localNode)
+ {
+ return YieldingClusterLockManager.this.localNode;
+ }
+
+ public void setLocalNode(ClusterNode localNode)
+ {
+ YieldingClusterLockManager.this.localNode = localNode;
+ }
+
+ public void lockFromCluster(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+ InterruptedException
+ {
+ YieldingClusterLockManager.this.doLock(lockName, caller, timeout);
+ }
+
+ public ClusterNode getLockHolder(Serializable lockName)
+ {
+ LocalLock lock = YieldingClusterLockManager.this.getLocalLock(lockName, false);
+ return lock == null ? null : lock.holder;
+ }
+
+ public void unlockFromCluster(Serializable lockName, ClusterNode caller)
+ {
+ YieldingClusterLockManager.this.doUnlock(lockName, caller);
+ }
+
+ }
+
+ private ClusterNode localNode;
+ private ConcurrentMap<Serializable, LocalLock> localLocks = new ConcurrentHashMap<Serializable, LocalLock>();
+ private final YieldingGloballyExclusiveClusterLockSupport clusterSupport;
+
+ public YieldingClusterLockManager(String serviceHAName, HAPartition partition)
+ {
+ ClusterHandler handler = new ClusterHandler();
+ clusterSupport = new YieldingGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+ }
+
+ // ----------------------------------------------------------------- Public
+
+ /**
+ * Acquire the given lock.
+ *
+ * @param lockName the identifier of the lock that should be acquired
+ * @param timeout max time in ms to wait before throwing a TimeoutException
+ * if the lock cannot be acquired
+ *
+ * @return enum indicating how the lock was acquired
+ *
+ * @throws TimeoutException if the lock cannot be acquired before the timeout
+ * @throws InterruptedException if the thread is interrupted while trying to
+ * acquire the lock
+ */
+ public LockResult lock (Serializable lockName, long timeout)
+ throws TimeoutException, InterruptedException
+ {
+ return lock(lockName, timeout, false);
+ }
+
+ /**
+ * Acquire the given lock.
+ *
+ * @param lockName the identifier of the lock that should be acquired
+ * @param timeout max time in ms to wait before throwing a TimeoutException
+ * if the lock cannot be acquired
+ * @param newLock <code>true</code> if this object should assume this is the
+ * first use cluster-wide of the lock identified by
+ * <code>lockName</code>, and just acquire the lock locally
+ * without any cluster-wide call. See discussion of
+ * {@link LockResult#NEW_LOCK}.
+ *
+ * @return enum indicating how the lock was acquired
+ *
+ * @throws TimeoutException if the lock cannot be acquired before the timeout
+ * @throws InterruptedException if the thread is interrupted while trying to
+ * acquire the lock
+ */
+ public LockResult lock(Serializable lockName, long timeout, boolean newLock)
+ throws TimeoutException, InterruptedException
+ {
+ if (this.localNode == null)
+ {
+ throw new IllegalStateException("Null localNode");
+ }
+
+ LockResult result = null;
+ LocalLock localLock = getLocalLock(lockName, false);
+ if (localLock == null)
+ {
+ if (newLock)
+ {
+ doLock(lockName, this.localNode, timeout);
+ result = LockResult.NEW_LOCK;
+ }
+ else
+ {
+ this.clusterSupport.lock(lockName, timeout);
+ result = LockResult.ACQUIRED_FROM_CLUSTER;
+ }
+ }
+ else
+ {
+ localLock.localLockCount.incrementAndGet(); // Now no other node can become localLock.holder
+ try
+ {
+ if (this.localNode.equals(localLock.holder))
+ {
+ result = LockResult.ALREADY_HELD;
+ }
+ else
+ {
+ this.clusterSupport.lock(lockName, timeout);
+ result = LockResult.ACQUIRED_FROM_CLUSTER;
+ }
+ }
+ finally
+ {
+ // If we called clusterSupport.lock() above, its callback into
+ // ClusterHandler.lockFromCluster() will increment localLock.localLockCount.
+ // So, decrement so we don't double count
+ // (If we threw an exception above we should also decrement)
+ if (result != LockResult.ALREADY_HELD)
+ {
+ localLock.localLockCount.decrementAndGet();
+ }
+ }
+ }
+
+ return result;
+ }
+
+ public void unlock(Serializable lockName)
+ {
+ this.clusterSupport.unlock(lockName);
+ }
+
+ public void start() throws Exception
+ {
+ this.clusterSupport.start();
+ }
+
+ public void stop() throws Exception
+ {
+ this.clusterSupport.stop();
+ }
+
+ // ----------------------------------------------------------------- Private
+
+ private LocalLock getLocalLock(Serializable categoryName, boolean create)
+ {
+ LocalLock category = localLocks.get(categoryName);
+ if (category == null && create)
+ {
+ category = new LocalLock();
+ LocalLock existing = localLocks.putIfAbsent(categoryName, category);
+ if (existing != null)
+ {
+ category = existing;
+ }
+ }
+ return category;
+ }
+
+ private void doLock(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+ InterruptedException
+ {
+ LocalLock lock = getLocalLock(lockName, true);
+ lock.lock(caller, timeout);
+ }
+
+ private void doUnlock(Serializable lockName, ClusterNode caller)
+ {
+ LocalLock lock = getLocalLock(lockName, false);
+ if (lock != null)
+ {
+ lock.unlock(caller);
+ }
+ }
+
+}
More information about the jboss-cvs-commits
mailing list